Go chan

2020/06/16 Go 知识点

chan

Channel是Go中的一个核心类型,可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。

receive 操作符

<-ch用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收。

从一个nil channel中接收数据会一直被block。

从一个被close的channel中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。

关闭判断

v, ok := <-ch

一个关闭的通道不会影响读取,可以通过第二个参数获取关闭状态;

获取通道内剩余的数量

len(ch)

chan超时控制

go 中实现超时控制的时候, select-case,粗糙地在超时的时候返回而不管尚在执行中的 goroutine 的死活。结果是,大批量调用受到几个调用超时的影响,一直超时无法恢复。

context 可以设置超时处理,解决超时退出下 goroutine 的退出问题。

案例:比如抢购锁的使用

https://ictar.xyz/2018/03/20/%E5%A6%82%E4%BD%95%E7%94%A8go%E5%AE%9E%E7%8E%B0%E8%B6%85%E6%97%B6%E6%8E%A7%E5%88%B6/

生产消费

package main

import "fmt"

func Producer(ch chan int) {
	for i := 1; i <= 10; i++ {
		ch <- i
		}
	close(ch)
}

func Consumer( ch chan int, done chan bool) {
	for {
		value, ok := <-ch
		if ok {
			fmt.Printf(" recv: %d\n", value)
			} else {
			fmt.Printf("closed\n")
			break
			}
		}
	done <- true
}

func main() {
	ch := make(chan int)
	done := make(chan bool)
	go Producer(ch)
	go Consumer( ch, done)
	<-done
}

发布订阅

docker项目中提供了一个pubsub的极简实现,下面是基于pubsub包实现的本地发布订阅代码:

import (
    "github.com/moby/moby/pkg/pubsub"
)

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)

    golang := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key, "golang:") {
                return true
            }
        }
        return false
    })
    docker := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key, "docker:") {
                return true
            }
        }
        return false
    })

    go p.Publish("hi")
    go p.Publish("golang: https://golang.org")
    go p.Publish("docker: https://www.docker.com/")
    time.Sleep(1)

    go func() {
        fmt.Println("golang topic:", <-golang)
    }()
    go func() {
        fmt.Println("docker topic:", <-docker)
    }()

    <-make(chan bool)
}

其中pubsub.NewPublisher构造一个发布对象,p.SubscribeTopic()可以通过函数筛选感兴趣的主题进行订阅。

设计:基于gRPC和pubsub包,提供一个跨网络的发布和订阅系统。

复杂设计: https://github.com/gooopher/go-insight/blob/feature/view/skill/pratice/queue/publish_subscribe.go

源码分析

ring buffer

channel 中使用了 ring buffer(环形缓冲区) 来缓存写入的数据。ring buffer 有很多好处,而且非常适合用来实现 FIFO 式的固定长度队列。 在 channel 中,ring buffer 的实现如下: hchan 中有两个与 buffer 相关的变量:recvx 和 sendx。其中 sendx 表示 buffer 中可写的 index,recvx 表示 buffer 中可读的 index。 从 recvx 到 sendx 之间的元素,表示已正常存放入 buffer 中的数据。 我们可以直接使用 buf[recvx]来读取到队列的第一个元素,使用 buf[sendx] = x 来将元素放到队尾。

参考:

https://lessisbetter.site/2019/01/20/golang-channel-all-usage/

https://pkg.go.dev/github.com/ph/moby@v1.13.0/pkg/pubsub

环形缓冲区的实现原理(ring buffer)

Search

    Table of Contents