1 Socket 一种TCP的调用
很多系统都提供Socket作为TCP网络链接的抽象。可以认为一个socket就是一个已经经过三次握手的链接。
1.1 Socket的通信过程
server端会持有一个仅用于监听和接收的socket,若监听到了相关的需求,则新建一个socket处理相关需求。
1.2 IO模型
由于服务端需要同时读写多个socket,所以创建了三个IO模型处理。
1.2.1 阻塞IO
建立三个线程,线程不断读取socket的数据,如果没有数据,则自旋再次读取(陷入内核状态),会卡住,直到接收到数据时,切换为业务态,继续执行。
这也导致线程的状态切换的开销特别大,显得“低级”。
1.2.2 非阻塞IO
即业务逻辑会反复遍历所有的socket,有数据则处理,否则直接返回调用下一个socket。因此,线程不会陷入内核态。
1.2.3 多路复用型IO
操作系统帮助我们监听socket,再注册到事件池中。
是一种非阻塞IO的升级版,将socket的可读事件注册在 event poll 中,每次业务函数遍历,事件池直接返回发生的事件(列表),业务则直接处理此socket。
2 Go 中对事件池的抽象
GO中有Network Poller 实现了对多路复用器的抽象(屏蔽了不同操作系统的epoll的区别)
2.1 linux-netpollinit()新建多路复用器
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) //epfd是linux的文件描述符
if epfd < 0 {
epfd = epollcreate(1024) //调用汇编函数创建事件池
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe() //创建linux的管道用于关闭epoll
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) //注册管道事件,有事件则关闭epoll
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
2.2 linux-netpollopen()插入事件
//传入一个socket的fd和pollDesc的指针
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET //插入四个事件
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd //pollDesc详细描述一个socket
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) //传入control中
}
2.3 linux-netpoll()查询发生的事件
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList { //等待的时间
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent //事件储存
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 { //没有事件发生
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList //协程列表
for i := int32(0); i < n; i++ { //遍历每一个事件
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
//如果是_EPOLLIN 增加可写的标志位
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
//如果是_EPOLLOUT 增加可读的标志位
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode) //将可运行的协程放入列表内
}
}
return toRun //协程的列表
}
首先调用epollwait()查询有那些事情发生,再根据socket相关的pollDesc信息,返回可运行的协程列表(即将被唤醒)。
3 Network Poller 本质
3.1 pollcache和pollDesc结构体
3.1.1 pollcache 结构体
type pollCache struct {
lock mutex //互斥锁
first *pollDesc //链表头
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
3.1.2 pollDesc结构体--描述Socket
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock-链表
fd uintptr // constant for pollDesc usage lifetime
atomicInfo atomic.Uint32 // atomic pollInfo
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
lock mutex // protects the following fields
closing bool
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
3.2 Network Poller的初始化
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
//....
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit() //开辟连接池
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
使用原子操作加载变量netpollInited,保证只初始化一个poller!
3.3 Network新增监听Socket
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc() //给链表分配一个新的结构体pollDesc-新增节点
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
//新增节点的状态初始化
pd.fd = fd
pd.closing = false
pd.setEventErr(false)
pd.rseq++
pd.rg.Store(0)
pd.rd = 0
pd.wseq++
pd.wg.Store(0)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
errno := netpollopen(fd, pd) //插入事件池
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
3.4 Network Poller收发数据
3.4.1 Socket已经可以读写
gc(g0协程)会循环调用netpoll()方法。---因为g0协程一直在跑。
func netpoll(delay int64) gList {
//...
//遍历每一个事件
var mode int32
//如果是_EPOLLIN 增加可写的标志位
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
//如果是_EPOLLOUT 增加可读的标志位
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode) //将可运行的协程放入列表内
}
//遍历结束
return toRun
}
重点在于netpollready()函数调用的netpollunblock()函数:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
//ioready = true
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := gpp.Load()
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady //更改状态
}
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
发现Socket可读写时,会将其状态改为pdReady(它的值为1)。
而后,协程会调用poll_runtime_pollWait()函数,判断Socket是否允许读写:
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError //没有错误,继续执行
}
调用了 netpollblock()函数,判断是否可以继续执行
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg //取出Socket的状态
}
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) { //pdReady的值为1,此语句成立
return true //表示直接可写
}
if gpp.CompareAndSwap(0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
3.4.2 Socket暂时无法读写
g0协程还是会一直调用netpoll()方法,判断是否有Socket需要处理。
协程会一直调用poll_runtime_pollWait()函数,其调用的netpollblock()函数会判断wg的值是否为pdwait,是的话,直接调用gopark()函数休眠---其中的参数之一是一个函数,会将此协程的地址写道rg或者wg上。
如果相关的Socket可读写,netpoll()中的netpollready()函数就会取出rg或者wg储存的地址,放入toRun协程列表中,最后会将该协程唤醒。
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
//取出成功,传输到toRun列表中。
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
5 Go对Socket的抽象-net
5.1 如何监听Socket的新连接-net.Listen
func Listen(network, address string) (Listener, error) { //Listener是一个接口
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
5.1.1 调用函数lc.Listen():
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
//...
case *TCPAddr:
l, err = sl.listenTCP(ctx, la) //TCP监听
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
//...
}
调用函数listenTCP():
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
调用函数internetSocket():
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
调用函数socket():
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto) //创建了一个socket
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
//...
}
调用函数newFD():
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
netFD的结构-net中对socket的详细解释:
// Network file descriptor.
type netFD struct {
pfd poll.FD //--记录了pollDsc
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
5.1.2 socket()创建完成netFD后,调用listenStream()
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
//创建完成netFD--
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
listenStream()中调用了Bind方法-绑定接口
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil { //最后调用了poll_runtime_pollOpen
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
5.1.3 listenTCP()最后返回TCPlistener
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}
5.1.4 总结
1.net.Listen()新建了一个Socket,并Bind相应的端口。
2.新建一个FD(Socket的详细描述),返回TCPListener对象(本质是一个LISTEN状态的Socket)。
3.将TCPListener的FD信息加入监听。
评论