其他分享
首页 > 其他分享> > Go channel——block为false时chansend/chanrecv的处理机制

Go channel——block为false时chansend/chanrecv的处理机制

作者:互联网

前言

本篇聚集select 2个case(1个send/recv case、1个default case)场景时sendrecv的具体处理。

更多内容分享,欢迎关注公众号:Go开发笔记

chansend

select {
case c <- v:
	... foo
default:
	... bar
}

其底层对应func为selectnbsend

selectnbsend

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

从注释中我们可以知道,select 2个case(1个send/recv case、1个default case)会被编译成if...else的形式进行处理。

注意:此时block默认为false

chansend具体实现

以下源码中省略了blocktrue的逻辑及部分debug及race的逻辑。

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block { // 对于select chan,向nil的chan发送消息会直接返回false
			return false
		}
		...
	}

	...
	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	if !block && c.closed == 0 && full(c) { // 对于select chan,未close时,缓存已满直接返回false
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock) // 获取锁

	if c.closed != 0 { // 如果chan已closed,则释放锁并panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {// 如果接收等待队列中有等待的接收者,直接发送到接收者。
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    // 以下是没有接收者的处理
	if c.qcount < c.dataqsiz {// 如果当前数据量少于缓存,即缓存还有剩余,存入发送缓存队列
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)// 获取数据存入缓存的位置
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)// 将数据存入缓存的指定位置
		c.sendx++// 指向下一个位置
		if c.sendx == c.dataqsiz {// 如果已达到队列大小,说明已满,重新指向开头位置
			c.sendx = 0
		}
		c.qcount++ // 数据量+1
		unlock(&c.lock)// 释放锁
		return true // 发送成功
	}
    // 当缓存已满时
	if !block {// select channel直接返回失败
		unlock(&c.lock)
		return false
	}

	...
	return true // 发送成功
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {// sg已存储发送内容
		sendDirect(c.elemtype, sg, ep)// 直接将内存发送至接收者,实际是将数据拷贝至接收者
		sg.elem = nil // 清空
	}
	gp := sg.g // 获取接收goroutine gp
	unlockf()
	gp.param = unsafe.Pointer(sg) // 设置唤醒参数为等待列表中的sg
	sg.success = true // 设置sg的状态为成功
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1) // 接收goroutine准备运行
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack. 跨栈拷贝

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
	dst := sg.elem 
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)// 拷贝数据前添加写屏障,确保在读取和使用之间不会发生抢占点。
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(dst, src, t.size) //将数据拷贝至dst,即sg.elem中
}

此场景下使用send时,需要注意:

chanrecv

// recv1
select {
case v = <-c:
	... foo
default:
	... bar
}

// recv2
select {
case v, ok = <-c: 
	... foo
default:
	... bar
}

以上两种场景recv处理方式一致,recv1recv2分别对应底层func selectnbrecvselectnbrecv2

selectnbrecv/selectnbrecv2

// compiler implements
//
//	select {
//	case v = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbrecv(&v, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if c != nil && selectnbrecv2(&v, &ok, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	// TODO(khr): just return 2 values from this function, now that it is in Go.
	selected, *received = chanrecv(c, elem, false)
	return
}å

selectnbrecvselectnbrecv2区别在于是否有received,注意block的默认值为false

chanrecv具体实现

以下源码中省略了block为true的逻辑及部分debug及race的逻辑。

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {/
		if !block {// 对于select chan,向nil的chan接收消息会直接返回false
			return
		}
		...
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {// 对于select chan,chan缓存为空时
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&c.closed) == 0 { // 如果chan已经close,直接返回
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {// 再次检查缓存是否为空
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep) //清空ep中的数据
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {// 再次检查
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {// 如果发送队列中有等待的发送者,直接从发送者接收数据
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	if c.qcount > 0 {// 如果有缓存
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)// 获取缓存的位置
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {// 将数据拷贝至ep中
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++ // 指向下一个位置
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount-- // 减少缓存数量
		unlock(&c.lock)
		return true, true
	}

	if !block {// 对于select channel操作,无缓存时直接返回false
		unlock(&c.lock)
		return false, false
	}
	return true, success
	...
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size) // 拷贝数据
}

此场景下使用recv时,需要注意:

总结

最后以一张图总结2个case(1个send/recv case、1个default case)场景下使用send/recv的处理逻辑:

在这里插入图片描述

与单独使用(包含select单个send/recv)的最大区别在于不会造成goroutine阻塞,不满足条件时,直接返回结果。

标签:缓存,false,chanrecv,chansend,send,closed,sg,channel
来源: https://blog.csdn.net/xz_studying/article/details/117194406