009-Golang1.17源码分析之goroutine
作者:互联网
Golang1.17源码分析之goroutine-009
Golang1.17 学习笔记009
包位置:runtime/runtime2.go
全局变量 g0 主协程、m0 工作线程
P 里面只是有个本地 runq,全局的 runq 存储在 sched 中
获取任务顺序,先从 m 自身 p 中的 runq 获取,没有就去全局 sched 中获取,没有再去其他 q 中拿一点
type g struct {
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
goid int64 // 协程id
}
type m struct {
g0 *g // goroutine with scheduling stack
curg *g // current running goroutine
// 处理器相关信息
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
oldp puintptr // the p that was attached before executing a syscall
}
type p struct {
status // 状态:_Pidle、_Prunning、_Psyscall、_Pgcstop、_Pdead
m muintptr // back-link to associated m (nil if idle)
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
}
type schedt struct {
midle muintptr // idle m's waiting for work
pidle puintptr // idle p's
// Global runnable queue.
runq gQueue
runqsize int32
}
创建:runtime/proc.go
调用 newproc(),然后内部调用 newproc1()
详细流程:newproc -> newproc1 -> (如果P数目没到上限)wakep -> startm -> (可能引发)newm -> newosproc
-> (线程入口)mstart -> schedule -> execute -> goroutine运行 (参考《深入理解go》之goroutine调度)
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
// 绑当前协程往可执行队列中加;如果本地队列满了,就往全局队列中加
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
// 获取协程
_g_ := getg()
// 获取线程
acquirem() // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// 协程参数大小判断
if siz >= _StackMin-4*sys.PtrSize-sys.PtrSize {
throw("newproc: function arguments too large for new goroutine")
}
// 获取当前协程所在线程的 P 指针
_p_ := _g_.m.p.ptr()
// 获取当前空闲的协程
newg := gfget(_p_)
if newg == nil {
// 栈上分配一个新的协程
newg = malg(_StackMin)
// 修改状态为 Gdead
casgstatus(newg, _Gidle, _Gdead)
// 加入到全局的 allg 中
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
// 获取栈帧地址
totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.StackAlign - 1) // align to StackAlign
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
// 参数拷贝到栈中
if narg > 0 {
memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
// This is a stack-to-stack copy. If write barriers
// are enabled and the source stack is grey (the
// destination is always black), then perform a
// barrier copy. We do this *after* the memmove
// because the destination stack may have garbage on
// it.
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
if stkmap.nbit > 0 {
// We're in the prologue, so it's always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
}
}
}
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
// 保存父级协程
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
// 如果是系统协程,那么 sched.ngsys(系统协程数)加 1
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
// 协程状态修改为 Grunable
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
// 释放获取协程所在的线程
releasem(_g_.m)
return newg
}
P 和 M 个数
-
P: 可以由 runtime.GOMAXPROCES()
-
M: go 启动时,设置 M 最大只,默认 10000。runtime/debug.SetMaxThreds 设置 M 最大数量
-
一个 M 被阻塞,会创建新的 M;反之,空闲 M 也可以被回收或睡眠
P 和 M 创建
- P:在确定最大数量 n 后,运行时程序就会创建 n 个 P
- M: 没有足够 M 来关联 P 并执行其中的 G。如:所有 M 被阻塞了,先去看还有空闲的没有,没有就创建 M
调度器策略:
-
workd stealing 机制:当前 M 没有 G,尝试去其他 M 绑定 P 中偷取 G
-
hand off 机制:当前 M 因 G 进行系统调用阻塞时,M 释放 P,把去转移到其他空闲线程执行
-
抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU
10ms,防止其他goroutine被饿死 -
全局G队列:当 M 执行 work stealing 从其他 P 偷不到 G 时,它可以从全局 G 队列获 G
参考文献:
《Go专家编程》之协程
《深入解析Go》之 goroutine 调度
《Golang 修养之路》之 Golang 的协程调度器原理及 GMP 设计思想?
标签:Golang1.17,协程,goroutine,newg,_.,源码,sched,009,fn 来源: https://blog.csdn.net/weixin_51546892/article/details/123050231