标签搜索

Go TCP

BeforeIeave
2022-08-02 / 0 评论 / 169 阅读 / 正在检测是否收录...

1 Socket 一种TCP的调用

很多系统都提供Socket作为TCP网络链接的抽象。可以认为一个socket就是一个已经经过三次握手的链接。

1.1 Socket的通信过程

Socket通讯过程.png
server端会持有一个仅用于监听和接收的socket,若监听到了相关的需求,则新建一个socket处理相关需求。

1.2 IO模型

由于服务端需要同时读写多个socket,所以创建了三个IO模型处理。

1.2.1 阻塞IO

建立三个线程,线程不断读取socket的数据,如果没有数据,则自旋再次读取(陷入内核状态),会卡住,直到接收到数据时,切换为业务态,继续执行。
这也导致线程的状态切换的开销特别大,显得“低级”。

1.2.2 非阻塞IO

非阻塞IO.png


即业务逻辑会反复遍历所有的socket,有数据则处理,否则直接返回调用下一个socket。因此,线程不会陷入内核态。

1.2.3 多路复用型IO

多路复用.png


操作系统帮助我们监听socket,再注册到事件池中。
是一种非阻塞IO的升级版,将socket的可读事件注册在 event poll 中,每次业务函数遍历,事件池直接返回发生的事件(列表),业务则直接处理此socket。

2 Go 中对事件池的抽象

GO中有Network Poller 实现了对多路复用器的抽象(屏蔽了不同操作系统的epoll的区别)

2.1 linux-netpollinit()新建多路复用器

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)        //epfd是linux的文件描述符
    if epfd < 0 {
        epfd = epollcreate(1024)            //调用汇编函数创建事件池
        if epfd < 0 {
            println("runtime: epollcreate failed with", -epfd)
            throw("runtime: netpollinit failed")
        }
        closeonexec(epfd)
    }
    r, w, errno := nonblockingPipe()        //创建linux的管道用于关闭epoll
    if errno != 0 {
        println("runtime: pipe failed with", -errno)
        throw("runtime: pipe failed")
    }
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)    //注册管道事件,有事件则关闭epoll
    if errno != 0 {
        println("runtime: epollctl failed with", -errno)
        throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

2.2 linux-netpollopen()插入事件

//传入一个socket的fd和pollDesc的指针
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET    //插入四个事件
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd        //pollDesc详细描述一个socket
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)        //传入control中
}

2.3 linux-netpoll()查询发生的事件

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {        //等待的时间
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent        //事件储存
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {            //没有事件发生
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        goto retry
    }
    var toRun gList        //协程列表
    for i := int32(0); i < n; i++ {        //遍历每一个事件
        ev := &events[i]
        if ev.events == 0 {
            continue
        }

        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if ev.events != _EPOLLIN {
                println("runtime: netpoll: break fd ready for", ev.events)
                throw("runtime: netpoll: break fd ready for something unexpected")
            }
            if delay != 0 {
                // netpollBreak could be picked up by a
                // nonblocking poll. Only read the byte
                // if blocking.
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
                atomic.Store(&netpollWakeSig, 0)
            }
            continue
        }

        var mode int32
        //如果是_EPOLLIN 增加可写的标志位
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        //如果是_EPOLLOUT 增加可读的标志位
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.setEventErr(ev.events == _EPOLLERR)
            netpollready(&toRun, pd, mode)        //将可运行的协程放入列表内
        }
    }
    return toRun            //协程的列表
}

首先调用epollwait()查询有那些事情发生,再根据socket相关的pollDesc信息,返回可运行的协程列表(即将被唤醒)。

3 Network Poller 本质

3.1 pollcache和pollDesc结构体

3.1.1 pollcache 结构体

type pollCache struct {
    lock  mutex                //互斥锁
    first *pollDesc            //链表头
    // PollDesc objects must be type-stable,
    // because we can get ready notification from epoll/kqueue
    // after the descriptor is closed/reused.
    // Stale notifications are detected using seq variable,
    // seq is incremented when deadlines are changed or descriptor is reused.
}

3.1.2 pollDesc结构体--描述Socket

type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock-链表
    fd   uintptr   // constant for pollDesc usage lifetime

    atomicInfo atomic.Uint32 // atomic pollInfo

    rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
    wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil

    lock    mutex // protects the following fields
    closing bool
    user    uint32    // user settable cookie
    rseq    uintptr   // protects from stale read timers
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline (a nanotime in the future, -1 when expired)
    wseq    uintptr   // protects from stale write timers
    wt      timer     // write deadline timer
    wd      int64     // write deadline (a nanotime in the future, -1 when expired)
    self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

3.2 Network Poller的初始化

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}
//....
func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lockInit(&netpollInitLock, lockRankNetpollInit)
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()                        //开辟连接池
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

使用原子操作加载变量netpollInited,保证只初始化一个poller!

3.3 Network新增监听Socket

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()        //给链表分配一个新的结构体pollDesc-新增节点
    lock(&pd.lock)
    wg := pd.wg.Load()
    if wg != 0 && wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    rg := pd.rg.Load()
    if rg != 0 && rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    //新增节点的状态初始化
    pd.fd = fd
    pd.closing = false
    pd.setEventErr(false)
    pd.rseq++
    pd.rg.Store(0)
    pd.rd = 0
    pd.wseq++
    pd.wg.Store(0)
    pd.wd = 0
    pd.self = pd
    pd.publishInfo()
    unlock(&pd.lock)

    errno := netpollopen(fd, pd)        //插入事件池
    if errno != 0 {
        pollcache.free(pd)
        return nil, int(errno)
    }
    return pd, 0
}

3.4 Network Poller收发数据

3.4.1 Socket已经可以读写

gc(g0协程)会循环调用netpoll()方法。---因为g0协程一直在跑。

func netpoll(delay int64) gList {
//...
    //遍历每一个事件
        var mode int32
        //如果是_EPOLLIN 增加可写的标志位
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        //如果是_EPOLLOUT 增加可读的标志位
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.setEventErr(ev.events == _EPOLLERR)
            netpollready(&toRun, pd, mode)        //将可运行的协程放入列表内
        }
    //遍历结束
    return toRun
}

重点在于netpollready()函数调用的netpollunblock()函数

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}
//ioready = true
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        old := gpp.Load()
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set pdReady for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady            //更改状态
        }
        if gpp.CompareAndSwap(old, new) {
            if old == pdWait {
                old = 0
            }
            return (*g)(unsafe.Pointer(old))
        }
    }
}

发现Socket可读写时,会将其状态改为pdReady(它的值为1)。
而后,协程会调用poll_runtime_pollWait()函数,判断Socket是否允许读写

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    for !netpollblock(pd, int32(mode), false) {
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
    }
    return pollNoError            //没有错误,继续执行
}

调用了 netpollblock()函数,判断是否可以继续执行

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg            //取出Socket的状态
    }

    // set the gpp semaphore to pdWait
    for {
        // Consume notification if already ready.
        if gpp.CompareAndSwap(pdReady, 0) {        //pdReady的值为1,此语句成立
            return true            //表示直接可写
        }
        if gpp.CompareAndSwap(0, pdWait) {
            break
        }

        // Double check that this isn't corrupt; otherwise we'd loop
        // forever.
        if v := gpp.Load(); v != pdReady && v != 0 {
            throw("runtime: double wait")
        }
    }

    if waitio || netpollcheckerr(pd, mode) == pollNoError {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := gpp.Swap(0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

3.4.2 Socket暂时无法读写

g0协程还是会一直调用netpoll()方法,判断是否有Socket需要处理。
协程会一直调用poll_runtime_pollWait()函数,其调用的netpollblock()函数会判断wg的值是否为pdwait,是的话,直接调用gopark()函数休眠---其中的参数之一是一个函数,会将此协程的地址写道rg或者wg上。
如果相关的Socket可读写,netpoll()中的netpollready()函数就会取出rg或者wg储存的地址,放入toRun协程列表中,最后会将该协程唤醒。

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    
    //取出成功,传输到toRun列表中。
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

5 Go对Socket的抽象-net

5.1 如何监听Socket的新连接-net.Listen

func Listen(network, address string) (Listener, error) {    //Listener是一个接口
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

5.1.1 调用函数lc.Listen():

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    //...
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)    //TCP监听
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    default:
        return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
    }
    //...
}

调用函数listenTCP():

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

调用函数internetSocket():

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
        raddr = raddr.toLocal(net)
    }
    family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

调用函数socket():

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    s, err := sysSocket(family, sotype, proto)        //创建了一个socket
    if err != nil {
        return nil, err
    }
    if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    //...
}

调用函数newFD():

func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
    ret := &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
            IsStream:      sotype == syscall.SOCK_STREAM,
            ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
        },
        family: family,
        sotype: sotype,
        net:    net,
    }
    return ret, nil
}

netFD的结构-net中对socket的详细解释

// Network file descriptor.
type netFD struct {
    pfd poll.FD                //--记录了pollDsc

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

5.1.2 socket()创建完成netFD后,调用listenStream()

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    //创建完成netFD--
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        }
    }
    if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    return fd, nil
}

listenStream()中调用了Bind方法-绑定接口

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    if err = fd.init(); err != nil {            //最后调用了poll_runtime_pollOpen
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

5.1.3 listenTCP()最后返回TCPlistener

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
    fd *netFD
    lc ListenConfig
}

5.1.4 总结

1.net.Listen()新建了一个Socket,并Bind相应的端口
2.新建一个FD(Socket的详细描述),返回TCPListener对象(本质是一个LISTEN状态的Socket)。
3.将TCPListener的FD信息加入监听。

5.2 Read()和Write()方法

在net包中的实现,比较简单,看看就行。

6结构简图

goSocket完整架构.png

0

评论

博主关闭了所有页面的评论