channel补充
作者:互联网
1. channel
1.1 channel的使用
例子:主go程发数据,子go程收数据。
package main
import (
"fmt"
"time"
)
//主go程发数据,子go程收数据
func chanDemo() {
c := make(chan int)
go func() {
for {
fmt.Println(<-c)//不断的向信道获取数据,主go程中的sleep结束后main就推出了,子go程也会自动结束,循环就退出了
}
}()
c <- 1
c <- 2
time.Sleep(time.Millisecond) //保证发数据的主go程先执行完,把所有数据都发出去了,再运行子go程接收数据
}
func main() {
chanDemo()
}
控制台输出:
1
2
1.2 建立channel数组
可以建立信道数组来创建多个信道:
package main
import (
"fmt"
"time"
)
//子go程接收数据
func worker(id int, c chan int) {
for {
fmt.Printf("worker %d recieverd %c\n", id, <-c)
}
}
//主go程发数据,子go程收数据
func chanDemo() {
//创建10个信道,往这10个信道传输数据
var c [10]chan int
for i := 0; i < 10; i++ {
c[i] = make(chan int)
go worker(i, c[i])
}
//往这10个信道各发送两个数据
for i := 0; i < 10; i++ {
c[i] <- i + 'a'
}
for i := 0; i < 10; i++ {
c[i] <- i + 'A'
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
控制台输出:
worker 2 recieverd c
worker 3 recieverd d
worker 5 recieverd f
worker 4 recieverd e
worker 6 recieverd g
worker 7 recieverd h
worker 8 recieverd i
worker 0 recieverd a
worker 0 recieverd A
worker 1 recieverd b
worker 1 recieverd B
worker 9 recieverd j
worker 9 recieverd J
worker 5 recieverd F
worker 6 recieverd G
worker 7 recieverd H
worker 2 recieverd C
worker 4 recieverd E
worker 3 recieverd D
worker 8 recieverd I
1.3 channel可以作为函数的形参和返回值
channel不仅可以作为函数的形参,还可以作为函数的返回值类型。此外,用<-加在chan关键字的左或者右可以更直观的告诉使用的人信道是专门用来发数据还是收数据的。将上面的work()
改成返回值类型为channel的createWorker()
:
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for {
fmt.Printf("worker %d recieverd %c\n", id, <-c) //输出从信道中接收到了什么数据
}
}
//用<-加在chan关键字的左或者右可以更直观的告诉使用的人信道是专门用来发数据还是收数据的
func createWorker(id int) chan<- int { //这里返回的channel是专门用来收数据的
c := make(chan int)
go worker(id, c)
return c
}
//主go程发数据,子go程收数据
func chanDemo() {
//创建10个专门用来收数据的信道
var c [10]chan<- int
for i := 0; i < 10; i++ {
c[i] = make(chan int)
c[i] = createWorker(i)
}
//主go程用来向信道发送数据
for i := 0; i < 10; i++ {
c[i] <- i + 'a'
}
for i := 0; i < 10; i++ {
c[i] <- i + 'A'
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
改造后与改造前的输出情况一样。
1.3.1 使用channel来等待goroutine结束
进一步改造,上面的代码中为了保证主go程比子go程先运行完所以用了time.Sleep()
,这样的方法不够好,那么有没有别的方法达到一个目的:当主go程发出数据,子go程接收数据并且打印,打印完毕会通知打印完毕。改造程序步骤如下:
- 首先将
worker()
改名doWork()
添加一个信道参数done chan bool
,用来赋值true
表示打印完毕。
func doWork(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("worker %d recieverd %c\n", id, n)//主go程发来了数据需要子go程来接收
done <- true
}
}
- 子go程中的
done
信道接收了一个数据,那么主go程就要接收此数据,为达成此目的,新建一个结构体worker
,此结构体包含用来传输数据的in
信道和用来通知是否打印完毕的信道done
。
type worker struct {
in chan int
done chan bool
}
因此createWorker()
和chanDemo()
都会相应做出变化。
createWorker()
返回值改为worker
:
func createWorker(id int) worker {
w := worker{
in: make(chan int),
done: make(chan bool),
}
go doWork(id, w.in, w.done)
return w
}
chanDemo()
:
func chanDemo() {
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
//第一个任务:
//向十个信道各发送一个小写字母
for i, worker := range workers {
worker.in <- 'a' + i
}
//子go程发出了数据需要主go程接收数据,十个信道各自接收到子go程发来的true表示打印完毕
for _, worker := range workers {
<-worker.done
}
//第二个任务:
//向十个信道各发送一个大写字母
for i, worker := range workers {
worker.in <- 'A' + i
}
//子go程发出了数据需要主go程接收数据,十个信道各自接收到子go程发来的true表示打印完毕
for _, worker := range workers {
<-worker.done
}
}
main调用chanDemo()
后控制台输出:
worker 7 recieverd h
worker 0 recieverd a
worker 6 recieverd g
worker 2 recieverd c
worker 3 recieverd d
worker 4 recieverd e
worker 1 recieverd b
worker 5 recieverd f
worker 8 recieverd i
worker 9 recieverd j
worker 9 recieverd J
worker 4 recieverd E
worker 0 recieverd A
worker 1 recieverd B
worker 2 recieverd C
worker 3 recieverd D
worker 6 recieverd G
worker 5 recieverd F
worker 8 recieverd I
worker 7 recieverd H
1.4 缓冲信道buffered channel
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for {
fmt.Printf("worker %d recieverd %c\n", id, <-c) //输出从信道中接收到了什么数据
}
}
func bufferedChannel() {
c := make(chan int, 3)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
time.Sleep(time.Millisecond)
}
func main() {
bufferedChannel()
}
控制台输出:
worker 0 recieverd a
worker 0 recieverd b
worker 0 recieverd c
worker 0 recieverd d
1.5 channel的关闭
channel一定是由发送方进行关闭,接收方有两种判断channel是否关闭的方法:①用,ok
判断ok是true
还是false
;②用range
自动检测channel是否关闭。
package main
import (
"fmt"
"time"
)
func work2(id int, c chan int) {
for n := range c {
fmt.Printf("worker %d recieverd %d\n", id, n) //输出从信道中接收到了什么数据
}
}
//信道由发送方来close
func channelClose() {
c := make(chan int)
go work2(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
close(c)
time.Sleep(time.Millisecond)
}
func main() {
channelClose()
}
控制台输出:
worker 0 recieverd 97
worker 0 recieverd 98
worker 0 recieverd 99
worker 0 recieverd 100
1.6 使用channel进行树的遍历
结合第二章函数式编程2.3案例二中的例子,在traversal.go
中加入TraversalWithChannel()
方法,该方法运行一个go程,go程中实现二叉树的中序遍历,同时返回一个channel,main方法调用该方法时,只需用range
对返回的channel进行遍历即可对二叉树进行中序顺序访问各个节点。
文件结构:
node.go
:
package tree
import "fmt"
type Node struct {
Value int
Left, Right *Node
}
加入了TraversalWithChannel()
方法后的traversal.go
详情:
package tree
import "fmt"
//TraverseFunc()中序模板
func (node *Node) InOrderTraverseFunc(f func(*Node)) {
if node == nil {
return
}
node.Left.InOrderTraverseFunc(f)
f(node)
node.Right.InOrderTraverseFunc(f)
}
func (node *Node) TraverseWithChannel() chan *Node {
out := make(chan *Node)
go func() {
node.InOrderTraverseFunc(func(node *Node) {
out <- node //调用中序模板,按照中序顺序将每个节点传递给out信道
})
close(out)
}()
return out
}
entry.go
:
package main
import (
"fmt"
"learngo/tree"
)
type myTreeNode struct {
*tree.Node //内嵌Embedding省略node
}
func main() {
root := myTreeNode{&tree.Node{Value: 3}} //直接让root成为myTreeNode类型
root.Left = &tree.Node{}
root.Right = &tree.Node{Value: 5}
root.Right.Left = new(tree.Node)
root.Left.Right = tree.CrateNode(2)
root.Right.Left.SetValue(4)
//按照中序顺序对二叉树进行遍历,统计出节点中最大的值
c := root.TraverseWithChannel()
maxNode := 0
for node := range c {
if node.Value > maxNode {
maxNode = node.Value
}
}
fmt.Println("maxNode = ", maxNode)
}
1.7 select
select
注意事项:
- select语句 只能用于channel信道的IO操作,每个case都必须是一个信道。
- 如果不设置 default条件,当没有IO操作发生时,select语句就会一直阻塞;
- 如果有一个或多个IO操作发生时,Go运行时会随机选择一个case执行,但此时将无法保证执行顺序;
- 对于case语句,如果存在信道值为nil的读写操作,则该分支将被忽略,可以理解为相当于从select语句中删除了这个case;
- 对于空的 select语句,会引起死锁;
- 对于在 for中的select语句,不能添加 default,否则会引起cpu占用过高的问题。
select语法:
package main
import "fmt"
func main() {
var c1, c2 chan int
//c1和c2谁先收到子go程的数据就执行它的语句,若都没收到则执行default
select {
case n := <-c1:
fmt.Println("Received from c1:", n)
case n := <-c2:
fmt.Println("Received from c2:", n)
default:
fmt.Println("No value received")
}
}
控制台输出:
No value received
改成for循环形式:
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func main() {
var c1, c2 = generator(), generator()
for {
//c1和c2谁先收到子go程的数据就执行它的语句
select {
case n := <-c1:
fmt.Println("Received from c1:", n)
case n := <-c2:
fmt.Println("Received from c2:", n)
}
}
}
此外将select
中的单纯打印语句结合之前的createWorker()
进一步改进:
package main
import (
"fmt"
"math/rand"
"time"
)
//创建
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)//rand.Intn(1500)表示生成0到1500之间的整数,若要拿这个做乘法则要按语法在外面套一层time.Duration()
out <- i
i++
}
}()
return out
}
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d recieverd %d\n", id, n) //输出从信道中接收到了什么数据
}
}
func createWorker(id int) chan<- int { //这里返回的channel是专门用来收数据的
c := make(chan int)
go worker(id, c)
return c
}
func main() {
var c1, c2 = generator(), generator() //新建两个信道c1和c2,并开两个goroutine分别向c1和c2传入数据
var worker = createWorker(0) //新建一个负责接收数据的信道worker,并开一个goroutine负责输出worker内的数据
n := 0
hasValue := false //负责判断信道内是否有值的标志变量,此举是为了防止阻塞
for {
var activeWorker chan<- int
if hasValue { //若信道有值则令activeWorker等于worker
activeWorker = worker
}
//c1和c2谁先收到子go程的数据就执行它的语句
select {
case n = <-c1: //检测c1是否可读
hasValue = true
case n = <-c2: //检测c2是否可读
hasValue = true
case activeWorker <- n: //检测activeWorker有没有数据可写入,一旦有数据写入则执行此语句,这个case是为了防止n收到数据没有人接收就又收到数据造成阻塞
//程序一上来就会执行此case,因为c1和c2在generator()中开的go程一上来就要等待一定的时间才运行,所以n一先开始不会收到数据
hasValue = false
}
}
}
1.8 并发编程模式
- 生成器模式:创建一个生成信道的函数,同时在此函数内开一个go程,这样拿到一个信道就能与此信道绑定的go程进行交互,数据传输。
生成器写法:
func generator() chan string {
c := make(chan string)
go func() {
}()
return c
}
例1:
package main
import (
"fmt"
"math/rand"
"time"
)
//生成器,可以生成消息
func msgGen(name string) chan string {
c := make(chan string)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) //两秒之内生成下一个消息
c <- fmt.Sprintf("service %s: message %d", name, i)
//fmt.Sprintf()并不会打印括号中的内容,而会把想要打印的结果作为字符串返回出来
i++
}
}()
return c
}
func main() {
m1 := msgGen("service1") //拿到m1就能与service1服务进行交互
m2 := msgGen("service2") //拿到m2就能与service2服务进行交互
//接收消息
for {
fmt.Println(<-m1)
fmt.Println(<-m2)
}
}
- 同时等待多个服务模式:有两种方法。
其一:两个channel同时传出数据,把这两个channel传出的数据赋值给变量,此变量再将数据传入给第三个channel,再由第三个channel来输出数据,示意图如下:
例1可以添加fanIn()
函数修改为以下形式:
package main
import (
"fmt"
"math/rand"
"time"
)
//生成器,可以生成消息
func msgGen(name string) chan string {
c := make(chan string)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) //两秒之内生成下一个消息
c <- fmt.Sprintf("service %s: message %d", name, i)
//fmt.Sprintf()并不会打印括号中的内容,而会把想要打印的结果作为字符串返回出来
i++
}
}()
return c
}
//传进来两个服务,从它们里面拿数据传给返回出来的channel
func fanIn(c1, c2 chan string) chan string {
c := make(chan string)
//开两个go程,将c1和c2中的数据传给c
//c1和c2谁先有数据,c里就是谁
go func() {
for {
c <- <-c1 //c1中取出数据传给c
}
}()
go func() {
for {
c <- <-c2 //c2中取出数据传给c
}
}()
return c
}
func main() {
m1 := msgGen("service1") //拿到m1就能与service1服务进行交互
m2 := msgGen("service2") //拿到m2就能与service2服务进行交互
m := fanIn(m1, m2)
//接收m中的数据
for {
fmt.Println(<-m)
}
}
控制台输出:
service service1: message 0
service service2: message 0
service service2: message 1
service service1: message 1
service service2: message 2
service service2: message 3
service service2: message 4
service service1: message 2
service service2: message 5
service service2: message 6
service service1: message 3
...
...
当fanIn()
传入多个服务时,不使用select
的形式,有多少个channel就要开多少个go程,将其改造为以下形式:
func fanIn(chs ...chan string) chan string {
c := make(chan string)
for _, ch := range chs { //ch全局只有一个,当传进来两个信道c1和c2,ch第一轮循环等于c1,第二轮ch就会被覆盖为c2,所以控制台就只会输出c2中的数据
go func() {
for {
c <- <-ch
}
}()
}
return c
}
但这样会出现一个问题:ch
全局只有一个,当传进来两个信道c1和c2,ch第一轮循环等于c1,第二轮ch就会被覆盖为c2,所以控制台就只会输出c2中的数据。
解决措施:在range后的花括号中加入局部变量chCopy
,将ch
赋值给chCopy
,因为每进一次花括号就会有新的chCopy
生成,例如第二次循环的chCopy
就是区别于第一次循环中的另外的chCopy
。
func fanIn(chs ...chan string) chan string {
c := make(chan string)
for _, ch := range chs {
chCopy := ch
go func() {
for {
c <- <-chCopy
}
}()
}
return c
}
进一步可以修改为更美观的形式,将go的func中加入形参:
func fanIn(chs ...chan string) chan string {
c := make(chan string)
for _, ch := range chs {
go func(in chan string) {
for {
c <- <-in
//因为go语言都是值传递,这里相当于将ch拷贝了一份给函数体内,每进一次循环都是新的in,跟上面的chCopy是同一思想
}
}(ch)
}
return c
}
其二:运用select
关键字,c1和c2谁先有数据就将这个channel中的数据传给变量m,m再传给第三个channel。
实现函数:
func fanInBySelect(c1, c2 chan string) chan string {
c := make(chan string)
go func() {
for {
select {
case m := <-c1:
c <- m
case m := <-c2:
c <- m
}
}
}()
return c
}
1.9 并发任务的控制
- 非阻塞等待
添加nonBlockingWait()
函数:
//生成器,可以生成消息
func msgGen(name string) chan string {
c := make(chan string)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) //两秒之内生成下一个消息
c <- fmt.Sprintf("service %s: message %d", name, i)
//fmt.Sprintf()并不会打印括号中的内容,而会把想要打印的结果作为字符串返回出来
i++
}
}()
return c
}
//非阻塞等待
//传入信道,返回信道中的数据以字符串的形式返回,返回的bool表示是否收到了数据的传输
func nonBlockingWait(c chan string) (string, bool) {
select {
//信道收到数据就返回该数据以及true
case m := <-c:
return m, true
//信道没收到数据就返回空串以及true
default:
return "", false
}
}
func main() {
m1 := msgGen("service1")
m2 := msgGen("service2")
//接收m1和m2中的数据
//进入for循环,先输出m1中的数据,再根据m2是否比m1先收到数据来决定是输出m2的数据还是输出no message from service2
//若m2的数据比m1的数据先到,那么m2就会收到数据,正常先打印m1中的数据再打印m2中的数据
//若m2的数据比m1的数据后到,那么m2就收不到数据,就会先打印m1中的数据,因为这个时候没有收到m2的数据,所以ok为false,就会打印no message from service2
for {
fmt.Println(<-m1)
m, ok := nonBlockingWait(m2)
if ok {
fmt.Println(m)
} else {
fmt.Println("no message from service2")
}
}
}
控制台输出:
service service1: message 0
service service2: message 0
service service1: message 1
service service2: message 1
service service1: message 2
no message from service2
service service1: message 3
no message from service2
service service1: message 4
no message from service2
service service1: message 5
service service2: message 2
service service1: message 6
service service2: message 3
...
...
- 超时机制
添加timeoutWait()
函数:
package main
import (
"fmt"
"math/rand"
"time"
)
//生成器,可以生成消息
func msgGen(name string) chan string {
c := make(chan string)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond) //五秒之内生成下一个消息
c <- fmt.Sprintf("service %s: message %d", name, i)
//fmt.Sprintf()并不会打印括号中的内容,而会把想要打印的结果作为字符串返回出来
i++
}
}()
return c
}
func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
select {
case m := <-c: //如果收到数据就返回信道中的数据和true
return m, true
case <-time.After(timeout): //如果在timeout的时间里还没接收到数据就返回空串和false
return "", false
}
}
func main() {
m1 := msgGen("service1")
for {
//msgGen()中一个消息会在5s内生成,此处只等2s,若2s内信息生成则打印出来,若2s内信息还没生成则打印timeout
m, ok := timeoutWait(m1, 2*time.Second)
if ok {
fmt.Println(m)
} else {
fmt.Println("timeout")
}
}
}
控制台输出:
timeout
service service1: message 0
timeout
service service1: message 1
service service1: message 2
timeout
timeout
service service1: message 3
timeout
service service1: message 4
service service1: message 5
- 任务中断/退出
package main
import (
"fmt"
"math/rand"
"time"
)
//生成器,可以生成消息
func msgGen(name string, done chan struct{}) chan string {
c := make(chan string)
go func() {
i := 0
for {
select {
//若5s内还没收到done的消息则生成信息
case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):
c <- fmt.Sprintf("service %s: message %d", name, i)
//若收到了done的消息则中断
case <-done:
fmt.Println("cleaning up")
return
}
i++
}
}()
return c
}
func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
select {
case m := <-c: //如果收到数据就返回信道中的数据和true
return m, true
case <-time.After(timeout): //如果在timeout的时间里还没接收到数据就返回空串和false
return "", false
}
}
func main() {
done := make(chan struct{})
m1 := msgGen("service1", done)
for i := 0; i < 5; i++ {
//msgGen()中一个消息会在5s内生成,此处只等2s,若2s内信息生成则打印出来,若2s内信息还没生成则打印timeout
m, ok := timeoutWait(m1, time.Second)
if ok {
fmt.Println(m)
} else {
fmt.Println("timeout")
}
}
//第一个花括号是声明部分,表示是一个结构体类型,第二个花括号表示初始化,里面是结构体内容,此处的结构体内容为空
done <- struct{}{} //传空结构体给done表示中断任务
time.Sleep(time.Second) //睡1s让main不要立刻退出,给运行cleaning up足够的时间
}
控制台输出:
timeout
timeout
timeout
service service1: message 0
timeout
cleaning up
- 优雅退出
在msgGen()
形参中添加done信道(done的类型是bool
或者struct{}
都可以),用途是当任务执行完成通知出去任务完成了。
//生成器,可以生成消息
//添加done信道,用途是当任务执行完成通知出去任务完成了
func msgGen(name string, done chan struct{}) chan string {
c := make(chan string)
go func() {
i := 0
for {
select {
case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond): //若5s内还没收到done的消息则生成信息
c <- fmt.Sprintf("service %s: message %d", name, i)
case <-done: //若收到了done的消息则中断
fmt.Println("cleaning up")
time.Sleep(2 * time.Second) //假设需要2s才能清理完
fmt.Println("cleanup done") //清理完成打印提示
done <- struct{}{} //传出空结构体给done表示清理完成,此时done为双向信道
return
}
i++
}
}()
return c
}
func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
select {
case m := <-c: //如果收到数据就返回信道中的数据和true
return m, true
case <-time.After(timeout): //如果在timeout的时间里还没接收到数据就返回空串和false
return "", false
}
}
func main() {
done := make(chan struct{})
m1 := msgGen("service1", done)
for i := 0; i < 5; i++ {
//msgGen()中一个消息会在5s内生成,此处只等2s,若2s内信息生成则打印出来,若2s内信息还没生成则打印timeout
m, ok := timeoutWait(m1, time.Second)
if ok {
fmt.Println(m)
} else {
fmt.Println("timeout")
}
}
//第一个花括号是声明部分,表示是一个结构体类型,第二个花括号表示初始化,里面是结构体内容,此处的结构体内容为空
done <- struct{}{} //传空结构体给done表示中断任务
<-done //接收到done中的信息才退出任务
}
标签:string,补充,worker,chan,recieverd,func,go,channel 来源: https://www.cnblogs.com/SpriteLee/p/16651515.html