chan
Channel是Go中的一个核心类型,可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。
receive 操作符
<-ch
用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收。
从一个nil channel中接收数据会一直被block。
从一个被close的channel中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。
关闭判断
v, ok := <-ch
一个关闭的通道不会影响读取,可以通过第二个参数获取关闭状态;
获取通道内剩余的数量
len(ch)
chan超时控制
go 中实现超时控制的时候, select-case,粗糙地在超时的时候返回而不管尚在执行中的 goroutine 的死活。结果是,大批量调用受到几个调用超时的影响,一直超时无法恢复。
context 可以设置超时处理,解决超时退出下 goroutine 的退出问题。
案例:比如抢购锁的使用
https://ictar.xyz/2018/03/20/%E5%A6%82%E4%BD%95%E7%94%A8go%E5%AE%9E%E7%8E%B0%E8%B6%85%E6%97%B6%E6%8E%A7%E5%88%B6/
生产消费
package main
import "fmt"
func Producer(ch chan int) {
for i := 1; i <= 10; i++ {
ch <- i
}
close(ch)
}
func Consumer( ch chan int, done chan bool) {
for {
value, ok := <-ch
if ok {
fmt.Printf(" recv: %d\n", value)
} else {
fmt.Printf("closed\n")
break
}
}
done <- true
}
func main() {
ch := make(chan int)
done := make(chan bool)
go Producer(ch)
go Consumer( ch, done)
<-done
}
发布订阅
docker项目中提供了一个pubsub的极简实现,下面是基于pubsub包实现的本地发布订阅代码:
import (
"github.com/moby/moby/pkg/pubsub"
)
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "golang:") {
return true
}
}
return false
})
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "docker:") {
return true
}
}
return false
})
go p.Publish("hi")
go p.Publish("golang: https://golang.org")
go p.Publish("docker: https://www.docker.com/")
time.Sleep(1)
go func() {
fmt.Println("golang topic:", <-golang)
}()
go func() {
fmt.Println("docker topic:", <-docker)
}()
<-make(chan bool)
}
其中pubsub.NewPublisher构造一个发布对象,p.SubscribeTopic()可以通过函数筛选感兴趣的主题进行订阅。
设计:基于gRPC和pubsub包,提供一个跨网络的发布和订阅系统。
复杂设计: https://github.com/gooopher/go-insight/blob/feature/view/skill/pratice/queue/publish_subscribe.go
源码分析
ring buffer
channel 中使用了 ring buffer(环形缓冲区) 来缓存写入的数据。ring buffer 有很多好处,而且非常适合用来实现 FIFO 式的固定长度队列。 在 channel 中,ring buffer 的实现如下: hchan 中有两个与 buffer 相关的变量:recvx 和 sendx。其中 sendx 表示 buffer 中可写的 index,recvx 表示 buffer 中可读的 index。 从 recvx 到 sendx 之间的元素,表示已正常存放入 buffer 中的数据。 我们可以直接使用 buf[recvx]来读取到队列的第一个元素,使用 buf[sendx] = x 来将元素放到队尾。
参考:
https://lessisbetter.site/2019/01/20/golang-channel-all-usage/
https://pkg.go.dev/github.com/ph/moby@v1.13.0/pkg/pubsub