[Golang] 剑走偏锋 -- IoComplete ports
- 作者: 内涵高级总监
- 来源: 51数据库
- 2021-08-05
前言
golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通信),對tcp數據處理要足夠快,否則會造成tcp 服務端的 receive buffer 溢出造成 packet loss,影響實時性和數據的完整性。
作者閲讀了golang 在windows 環境下 tcp 部分syscall 的實現,最終確認它的底層模型是用了完成端口(異步io模型)的。
但是由於作者本人比較喜歡折騰,所以用golang 底層的syscall 實現了一下tcp 完成端口服務。
iocompletion port
以下為windows環境下用golang實現的 iocompletion port server
iocompletionrootcontext
管理指定 port 上所有 accepted socket:
type iocompletionrootcontext struct {
socket windows.handle
socketaddr windows.sockaddrinet4
ioset []*iocompletioncontext
sync.mutex
}
func (root *iocompletionrootcontext) newiocontext() *iocompletioncontext {
root.lock()
defer root.unlock()
res := &iocompletioncontext{
data: make([]byte, 65535),
overlapped: windows.overlapped{
internal: 0,
internalhigh: 0,
offset: 0,
offsethigh: 0,
hevent: 0,
},
}
res.wsabuf.buf = &res.data[0]
res.wsabuf.len = uint32(65535)
root.ioset = append(root.ioset, res)
return res
}
func newrootcontext() *iocompletionrootcontext {
return &iocompletionrootcontext{
ioset: make([]*iocompletioncontext, 0),
}
}
iocompletioncontext
accepted socket 的上下文:
type iocompletioncontext struct {
socket windows.handle
socketaddr windows.sockaddrinet4
wsabuf windows.wsabuf
data []byte
optype op_type
overlapped windows.overlapped
}
iocompletionserver
完成端口服務:
type iocompletionserver struct {
addr string
port int
recvfunc func(data []byte) error
rootctx *iocompletionrootcontext
// 爲了防止内存移動,采用此種方式
accepts sync.map
hiocompletionport windows.handle
}
func (ss *iocompletionserver) saveiorootctx(id uint32, ctx *iocompletionrootcontext) {
ss.accepts.store(id, ctx)
}
func (ss *iocompletionserver) loadiorootctx(id uint32) *iocompletionrootcontext {
if id == uint32(ss.rootctx.socket) {
return ss.rootctx
}
if v, isok := ss.accepts.load(id); isok {
if res, isok := v.(*iocompletionrootcontext); isok {
return res
}
}
return nil
}
func (ss *iocompletionserver) remove(id uint32) {
ss.accepts.delete(id)
}
func (ss *iocompletionserver) registerreceivefunc(rfunc func([]byte) error) {
ss.recvfunc = rfunc
}
func (ss *iocompletionserver) listen() {
dwbytestransfered := uint32(0)
var ctxid uint32
var overlapped *windows.overlapped
for {
err := windows.getqueuedcompletionstatus(ss.hiocompletionport, &dwbytestransfered,
&ctxid, &overlapped, windows.infinite)
if err != nil {
fmt.printf("syscall.getqueuedcompletionstatus: %v\n", err)
}
if overlapped == nil {
continue
}
// 通过位移取得ioctx
ioctx := (*iocompletioncontext)(unsafe.pointer(uintptr(unsafe.pointer(overlapped)) - unsafe.offsetof(iocompletioncontext{}.overlapped)))
switch ioctx.optype {
case accept_posted:
{
ss.doacceptex(ss.loadiorootctx(ctxid), ioctx)
}
case recv_posted:
{
ss.doreceive(ss.loadiorootctx(ctxid), ioctx)
}
case send_posted:
case null_posted:
default:
}
}
}
func (ss *iocompletionserver) doacceptex(rootctx *iocompletionrootcontext, ioctx *iocompletioncontext) (err error) {
nfdctx := newrootcontext()
nfdctx.socket = ioctx.socket
addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{}))
var localaddr, remoteaddr *windows.rawsockaddrany
lrsalen := int32(addrsize)
rrsalen := int32(addrsize)
// 與windows c++ 不同,此處函數無需去函數指針即可使用
windows.getacceptexsockaddrs(ioctx.wsabuf.buf, ioctx.wsabuf.len-(addrsize+16)*2,
addrsize+16, addrsize+16, &localaddr, &lrsalen, &remoteaddr, &rrsalen)
if ss.recvfunc != nil {
ss.recvfunc(ioctx.data[:ioctx.overlapped.internalhigh])
}
// 继承listen socket的属性
err = windows.setsockopt(nfdctx.socket, windows.sol_socket, windows.so_update_accept_context,
(*byte)(unsafe.pointer(&ss.rootctx.socket)), int32(unsafe.sizeof(ss.rootctx.socket)))
if err != nil {
return errors.wrap(err, "syscall.acceptex")
}
err = windows.setsockoptint(nfdctx.socket, windows.sol_socket, windows.so_rcvbuf, 65535)
if err != nil {
return errors.wrap(err, "windows.setsockoptint")
}
// 綁定到完成端口, 此步驟很關鍵
handle, err := windows.createiocompletionport(nfdctx.socket,
ss.hiocompletionport, uint32(nfdctx.socket), 0)
if err != nil {
return errors.wrap(err, "syscall.createiocompletionport")
} else {
fmt.println(handle, rootctx.socket)
}
// 投遞接收請求, 此處可以自行修改
for i := 0; i < 16; i++ {
nfdioctx := nfdctx.newiocontext()
nfdioctx.socket = nfdctx.socket
if err = ss.receive(nfdioctx); err != nil {
return err
}
}
//投遞接收連接請求
if err = ss.acceptex(ioctx); err != nil {
return err
}
// 保存到context中
ss.saveiorootctx(uint32(nfdctx.socket), nfdctx)
return nil
}
func (ss *iocompletionserver) acceptex(ctx *iocompletioncontext) (err error) {
ctx.socket = windows.handle(c.mwsasocket())
dwbytes := uint32(0)
addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{}))
ctx.optype = accept_posted
//err = syscall.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf,
// ctx.wsabuf.len-2*(addrsize+16), addrsize+16,
// addrsize+16, &dwbytes, &ctx.overlapped)
//windows.wsaioctl(ss.rootctx.socket, windows.sio_get_extension_function_pointer)
err = windows.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf,
ctx.wsabuf.len-2*(addrsize+16), addrsize+16,
addrsize+16, &dwbytes, &ctx.overlapped)
if err != nil {
if err == windows.errno(997) { // error_io_pending 表示尚未接收到鏈接
err = nil
} else {
err = errors.wrap(err, "syscall.acceptex")
}
}
return err
}
func (ss *iocompletionserver) doreceive(rootctx *iocompletionrootcontext, ctx *iocompletioncontext) {
if ctx.overlapped.internalhigh == 0 {
if rootctx != nil {
ss.remove(uint32(rootctx.socket))
c.mclose(c.int(rootctx.socket))
}
return
}
if ss.recvfunc != nil {
ss.recvfunc(ctx.data[:ctx.overlapped.internalhigh])
}
ss.receive(ctx)
}
func (ss *iocompletionserver) receive(ioctx *iocompletioncontext) error {
recv := uint32(0)
flags := uint32(0)
ioctx.optype = recv_posted
err := windows.wsarecv(ioctx.socket, &ioctx.wsabuf,
1, &recv, &flags, &ioctx.overlapped, nil)
if err != nil {
if err == windows.errno(997) { // error_io_pending 表示尚未接收到數據
err = nil
} else {
err = errors.wrap(err, "syscall.acceptex")
}
}
return err
}
func setdefaultsockopt(handle windows.handle) error {
err := windows.setsockoptint(handle, windows.sol_socket, windows.so_reuseaddr, 1)
if err != nil {
return errors.wrap(err, "syscall.setsockoptint")
}
//err = windows.setsockoptint(handle, windows.sol_socket, windows.so, 1)
//if err != nil {
// return errors.wrap(err, "syscall.setsockoptint")
//}
return nil
}
func (ss *iocompletionserver) start() error {
fmt.println(windows.wsastartup(2, &windows.wsadata{}))
// 初始創建一個用於綁定的 listen socket 的 iocompletion 句柄
hiocompletionport, err := windows.createiocompletionport(windows.invalidhandle, 0, 0, 0)
if err != nil {
return errors.wrap(err, "syscall.createiocompletionport")
}
ss.hiocompletionport = hiocompletionport
rootctx := newrootcontext()
rootctx.socket = windows.handle(c.mwsasocket())
setdefaultsockopt(rootctx.socket)
ss.rootctx = rootctx
handle, err := windows.createiocompletionport(rootctx.socket,
hiocompletionport, uint32(ss.rootctx.socket), 0)
if err != nil {
return errors.wrap(err, "syscall.createiocompletionport")
} else {
fmt.println(handle, rootctx.socket)
}
sockaddr := windows.sockaddrinet4{}
sockaddr.port = ss.port
if err := windows.bind(rootctx.socket, &sockaddr); err != nil {
return errors.wrap(err, "syscall.bind")
}
if err := windows.listen(rootctx.socket, max_post_accept); err != nil {
return errors.wrap(err, "windows.listen")
}
ss.rootctx = rootctx
if err := ss.acceptex(rootctx.newiocontext()); err != nil {
return err
}
return nil
}
example
完成端口服務使用示例:
ss = &streamserver{
addr: "127.0.0.1:10050",
port: 10050,
accepts: sync.map{},
}
ss.registerreceivefunc(func(data []byte) error {
fmt.println("receive data len:", string(data))
return nil
})
// 可以啓動多個携程來接收請求,但是需要特別注意的是
// 多携程可能會導致接受數據包時序發生亂序
ss.listen()
結尾
以上代碼經過實際測試檢驗,可以正常使用,尚未與標準庫進行 效率\性能 對比,沒有實現 send 功能,此處需要提醒的是,使用 iocompletion port 發送數據要注意時序的把握。
iocompletion port 是windows 系統中十分優秀的io模型, 深入瞭解其工作機制及原理, 也有助於我們對操作系統 io 數據處理的機制有更清晰的認知。
參考
- c++ 完成端口實現
- overlapped
- i/o completion ports
- createiocompletionport
- acceptex
- getacceptexsockaddrs
推荐阅读
