从源码分析 Go 语言使用 cgo 导致的线程增长

来源:博客园 时间:2023-06-05 11:37:26

TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。


【资料图】

转换 cgo 代码

对 driver-go/wrapper/taosc.go 进行转换

go tool cgo taosc.go

执行后生成 _obj文件夹

go 代码分析

taosc.cgo1.goTaosResetCurrentDB为例来分析。

// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);func TaosResetCurrentDB(taosConnect unsafe.Pointer) {    func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()}//go:linkname _cgoCheckPointer runtime.cgoCheckPointerfunc _cgoCheckPointer(interface{}, interface{})//go:cgo_unsafe_argsfunc _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {    _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))    if _Cgo_always_false {        _Cgo_use(p0)    }    return}//go:linkname _cgo_runtime_cgocall runtime.cgocallfunc _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32//go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db//go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_dbvar __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db bytevar _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)
TaosResetCurrentDB首先调用 _cgoCheckPointer检查传入参数是否为 nil//go:linkname _cgoCheckPointer runtime.cgoCheckPointer表示 cgoCheckPointer方法实现是 runtime.cgoCheckPointer,如果传入参数是 nil程序将会 panic。接着调用 _Cfunc_taos_reset_current_dbCfunc_taos_reset_current_db方法中 _Cgo_always_false在运行时会是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))_cgo_runtime_cgocall实现是 runtime.cgocall这个会重点分析。_cgo_453a0cad50ef_Cfunc_taos_reset_current_db由上方最后代码块可以看出是 taos_reset_current_db方法指针。uintptr(unsafe.Pointer(&p0))表示 p0 的指针地址。由上面可以看出这句意思是调用 runtime.cgocall,参数为方法指针和参数的指针地址。分析 runtime.cgocall

基于 golang 1.20.4分析该方法

func cgocall(fn, arg unsafe.Pointer) int32 {    if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {        throw("cgocall unavailable")    }    if fn == nil {        throw("cgocall nil")    }    if raceenabled {        racereleasemerge(unsafe.Pointer(&racecgosync))    }    mp := getg().m // 获取当前 goroutine 的 M    mp.ncgocall++  // 总 cgo 计数 +1    mp.ncgo++      // 当前 cgo 计数 +1    mp.cgoCallers[0] = 0 // 重置追踪    entersyscall() // 进入系统调用,保存上下文, 标记当前 goroutine 独占 m, 跳过垃圾回收    osPreemptExtEnter(mp) // 标记异步抢占, 使异步抢占逻辑失效    mp.incgo = true // 修改状态    errno := asmcgocall(fn, arg) // 真正进行方法调用的地方    mp.incgo = false // 修改状态    mp.ncgo-- // 当前 cgo 调用-1    osPreemptExtExit(mp) // 恢复异步抢占    exitsyscall() // 退出系统调用,恢复调度器控制    if raceenabled {        raceacquire(unsafe.Pointer(&racecgosync))    }    // 避免 GC 过早回收    KeepAlive(fn)    KeepAlive(arg)    KeepAlive(mp)    return errno}

其中两个主要的方法 entersyscallasmcgocall,接下来对这两个方法进行着重分析。

分析 entersyscall
func entersyscall() {    reentersyscall(getcallerpc(), getcallersp())}

entersyscall直接调用的 reentersyscall,关注下 reentersyscall注释中的一段:

// If the syscall does not block, that is it, we do not emit any other events.// If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;

如果 syscall调用没有阻塞则不会触发任何事件,如果被阻塞 retaker会触发 traceGoSysBlock,那需要了解一下多长时间被认为是阻塞,先跟到 retaker方法。

func retake(now int64) uint32 {    n := 0    lock(&allpLock)    for i := 0; i < len(allp); i++ {        pp := allp[i]        if pp == nil {            continue        }        pd := &pp.sysmontick        s := pp.status        sysretake := false        if s == _Prunning || s == _Psyscall {            t := int64(pp.schedtick)            if int64(pd.schedtick) != t {                pd.schedtick = uint32(t)                pd.schedwhen = now            } else if pd.schedwhen+forcePreemptNS <= now {                preemptone(pp)                sysretake = true            }        }        // 从系统调用中抢占P        if s == _Psyscall {            // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P            t := int64(pp.syscalltick)            if !sysretake && int64(pd.syscalltick) != t {                pd.syscalltick = uint32(t)                pd.syscallwhen = now                continue            }            if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {                continue            }            unlock(&allpLock)            incidlelocked(-1)            if atomic.Cas(&pp.status, s, _Pidle) {                if trace.enabled {                    traceGoSysBlock(pp)                    traceProcStop(pp)                }                n++                pp.syscalltick++                handoffp(pp)            }            incidlelocked(1)            lock(&allpLock)        }    }    unlock(&allpLock)    return uint32(n)}

从上面可以看到系统调用阻塞 20 多微秒会被抢占 P,cgo 被迫 handoffp,接下来分析 handoffp方法

func handoffp(pp *p) {    // ...    // 没有任务且没有自旋和空闲的 M 则需要启动一个新的 M    if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {        sched.needspinning.Store(0)        startm(pp, true)        return    }    // ...}

handoffp方法会调用 startm来启动一个新的 M,跟到 startm方法。

func startm(pp *p, spinning bool) {    // ...    nmp := mget()    if nmp == nil {        // 没有M可用,调用newm        id := mReserveID()        unlock(&sched.lock)        var fn func()        if spinning {            fn = mspinning        }        newm(fn, pp, id)        releasem(mp)        return    }    // ...}

此时如果没有 M startm会调用 newm创建一个新的 M,接下来分析 newm方法。

func newm(fn func(), pp *p, id int64) {    acquirem()    mp := allocm(pp, fn, id)    mp.nextp.set(pp)    mp.sigmask = initSigmask    if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {        lock(&newmHandoff.lock)        if newmHandoff.haveTemplateThread == 0 {            throw("on a locked thread with no template thread")        }        mp.schedlink = newmHandoff.newm        newmHandoff.newm.set(mp)        if newmHandoff.waiting {            newmHandoff.waiting = false            notewakeup(&newmHandoff.wake)        }        unlock(&newmHandoff.lock)        releasem(getg().m)        return    }    newm1(mp)    releasem(getg().m)}func newm1(mp *m) {    if iscgo {        var ts cgothreadstart        if _cgo_thread_start == nil {            throw("_cgo_thread_start missing")        }        ts.g.set(mp.g0)        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))        ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))        if msanenabled {            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))        }        if asanenabled {            asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))        }        execLock.rlock()        // 创建新线程        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))        execLock.runlock()        return    }    execLock.rlock()    newosproc(mp)    execLock.runlock()}

newm看出如果线程都在阻塞中则调用 newm1newm1调用 _cgo_thread_start创建新线程。

由以上分析得出当高并发调用 cgo 且执行时间超过 20 微秒时会创建新线程。

分析 asmcgocall

只分析 amd64asm_amd64.s

TEXT ·asmcgocall(SB),NOSPLIT,$0-20    MOVQ    fn+0(FP), AX    MOVQ    arg+8(FP), BX    MOVQ    SP, DX    // 考虑是否需要切换到 m.g0 栈    // 也用来调用创建新的 OS 线程,这些线程已经在 m.g0 栈中了    get_tls(CX)    MOVQ    g(CX), DI    CMPQ    DI, $0    JEQ nosave    MOVQ    g_m(DI), R8    MOVQ    m_gsignal(R8), SI    CMPQ    DI, SI    JEQ nosave    MOVQ    m_g0(R8), SI    CMPQ    DI, SI    JEQ nosave        // 切换到系统栈    CALL    gosave_systemstack_switch<>(SB)    MOVQ    SI, g(CX)    MOVQ    (g_sched+gobuf_sp)(SI), SP    // 于调度栈中(pthread 新创建的栈)    // 确保有足够的空间给四个 stack-based fast-call 寄存器    // 为使得 windows amd64 调用服务    SUBQ    $64, SP    ANDQ    $~15, SP // 为 gcc ABI 对齐    MOVQ    DI, 48(SP) // 保存 g    MOVQ    (g_stack+stack_hi)(DI), DI    SUBQ    DX, DI    MOVQ    DI, 40(SP) // 保存栈深 (不能仅保存 SP,因为栈可能在回调时被复制)    MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数    MOVQ    BX, CX  // CX = Win64 第一个参数    CALL    AX  // 调用 fn    // 恢复寄存器、 g、栈指针    get_tls(CX)    MOVQ    48(SP), DI    MOVQ    (g_stack+stack_hi)(DI), SI    SUBQ    40(SP), SI    MOVQ    DI, g(CX)    MOVQ    SI, SP    MOVL    AX, ret+16(FP)    RETnosave:    // 在系统栈上运行,可能没有 g    // 没有 g 的情况发生在线程创建中或线程结束中(比如 Solaris 平台上的 needm/dropm)    // 这段代码和上面类似,但没有保存和恢复 g,且没有考虑栈的移动问题(因为我们在系统栈上,而非 goroutine 栈)    // 如果已经在系统栈上,则上面的代码可被直接使用,在 Solaris 上会进入下面这段代码。    // 使用这段代码来为所有 "已经在系统栈" 的调用进行服务,从而保持正确性。    SUBQ    $64, SP    ANDQ    $~15, SP // ABI 对齐    MOVQ    $0, 48(SP) // 上面的代码保存了 g, 确保 debug 时可用    MOVQ    DX, 40(SP) // 保存原始的栈指针    MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数    MOVQ    BX, CX  // CX = Win64 第一个参数    CALL    AX    MOVQ    40(SP), SI // 恢复原来的栈指针    MOVQ    SI, SP    MOVL    AX, ret+16(FP)    RET

这段就是将当前栈移到系统栈去执行,因为 C 需要无穷大的栈,在 Go 的栈上执行 C 函数会导致栈溢出。

产生问题

cgo 调用会将当前栈移到系统栈,并且当 cgo 高并发调用且阻塞超过 20 微秒时会新建线程。而 Go 并不会销毁线程,由此造成线程增长。

解决方案

限制 Go 程序最大线程数,默认为 cpu 核数。

runtime.GOMAXPROCS(runtime.NumCPU())

使用 channel 限制 cgo 最大并发数为 cpu 核数

package threadimport "runtime"var c chan struct{}func Lock() {    c <- struct{}{}}func Unlock() {    <-c}func init() {    c = make(chan struct{}, runtime.NumCPU())}

针对超过 20 微秒的 cgo 调用进行限制:

thread.Lock()wrapper.TaosFreeResult(result)thread.Unlock()

X 关闭

Copyright   2015-2022 北方净水网版权所有   备案号:京ICP备2021034106号-50   联系邮箱: 55 16 53 8@qq.com