1 Socket 一种TCP的调用
1.1 Socket的通信过程
1.2 IO模型
1.2.1 阻塞IO
1.2.2 非阻塞IO
1.2.3 多路复用型IO
是一种非阻塞IO的升级版,将socket的可读事件注册在 event poll 中,每次业务函数遍历,事件池直接返回发生的事件(列表),业务则直接处理此socket。
2 Go 中对事件池的抽象
GO中有Network Poller 实现了对多路复用器的抽象(屏蔽了不同操作系统的epoll的区别)
2.1 linux-netpollinit()新建多路复用器
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) //epfd是linux的文件描述符
if epfd < 0 {
epfd = epollcreate(1024) //调用汇编函数创建事件池
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
r, w, errno := nonblockingPipe() //创建linux的管道用于关闭epoll
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
ev := epollevent{
events: _EPOLLIN,
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) //注册管道事件,有事件则关闭epoll
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
2.2 linux-netpollopen()插入事件
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET //插入四个事件
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd //pollDesc详细描述一个socket
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) //传入control中
2.3 linux-netpoll()查询发生的事件
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList { //等待的时间
if epfd == -1 {
return gList{}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
var events [128]epollevent //事件储存
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 { //没有事件发生
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
goto retry
var toRun gList //协程列表
for i := int32(0); i < n; i++ { //遍历每一个事件
ev := &events[i]
if ev.events == 0 {
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
var mode int32
//如果是_EPOLLIN 增加可写的标志位
mode += 'r'
//如果是_EPOLLOUT 增加可读的标志位
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode) //将可运行的协程放入列表内
return toRun //协程的列表
3 Network Poller 本质
3.1 pollcache和pollDesc结构体
3.1.1 pollcache 结构体
type pollCache struct {
lock mutex //互斥锁
first *pollDesc //链表头
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
3.1.2 pollDesc结构体--描述Socket
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock-链表
fd uintptr // constant for pollDesc usage lifetime
atomicInfo atomic.Uint32 // atomic pollInfo
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
lock mutex // protects the following fields
closing bool
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
3.2 Network Poller的初始化
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
if netpollInited == 0 {
netpollinit() //开辟连接池
atomic.Store(&netpollInited, 1)
3.3 Network新增监听Socket
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc() //给链表分配一个新的结构体pollDesc-新增节点
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
pd.fd = fd
pd.closing = false
pd.rd = 0
pd.wd = 0
pd.self = pd
errno := netpollopen(fd, pd) //插入事件池
if errno != 0 {
return nil, int(errno)
return pd, 0
3.4 Network Poller收发数据
3.4.1 Socket已经可以读写
func netpoll(delay int64) gList {
var mode int32
//如果是_EPOLLIN 增加可写的标志位
mode += 'r'
//如果是_EPOLLOUT 增加可读的标志位
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode) //将可运行的协程放入列表内
return toRun
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
if rg != nil {
if wg != nil {
//ioready = true
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
for {
old := gpp.Load()
if old == pdReady {
return nil
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
var new uintptr
if ioready {
new = pdReady //更改状态
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
old = 0
return (*g)(unsafe.Pointer(old))
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
return pollNoError //没有错误,继续执行
调用了 netpollblock()函数,判断是否可以继续执行
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg //取出Socket的状态
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) { //pdReady的值为1,此语句成立
return true //表示直接可写
if gpp.CompareAndSwap(0, pdWait) {
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
return old == pdReady
3.4.2 Socket暂时无法读写
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
if rg != nil {
if wg != nil {
5 Go对Socket的抽象-net
5.1 如何监听Socket的新连接-net.Listen
func Listen(network, address string) (Listener, error) { //Listener是一个接口
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
5.1.1 调用函数lc.Listen():
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la) //TCP监听
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto) //创建了一个socket
if err != nil {
return nil, err
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
return nil, err
if fd, err = newFD(s, family, sotype, net); err != nil {
return nil, err
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
family: family,
sotype: sotype,
net: net,
return ret, nil
// Network file descriptor.
type netFD struct {
pfd poll.FD //--记录了pollDsc
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
5.1.2 socket()创建完成netFD后,调用listenStream()
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
return nil, err
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
return nil, err
return fd, nil
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
return nil, err
return fd, nil
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
if err = fd.init(); err != nil { //最后调用了poll_runtime_pollOpen
return err
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
5.1.3 listenTCP()最后返回TCPlistener
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
5.1.4 总结