使用fan-out模型进行并发
go编程中可通过go func()
直接启动一个gouroutine进行并发操作。go语言一句很有名的经典语是不要通过共享内存来通信,而是通过通信来共享内存
,此处的共享内存
通常情况下指的都是channel
。本文介绍如何使用fan
模式实现go的并发控制,以达到控制goroutine数量、从同一数据源读取数据并发处理的目的。
基础的fan并发模式有fan-in
和fan-out
两种,前者收敛,后者张开,主要定义如下:
- fan-in: 1个goroutine从多个通道读取数据,直到通道关闭。是一种收敛模式,又被称为扇入,多用于收集处理结果。
- fan-out: 多个goroutine从1个通道读取数据,直到通道关闭。是一种张开模式,又被称为扇出,多用于分发任务。
了解fan模式定义后,我们将实现一个基于fan-out
实现的并发模型。代码主要场景为给定一个url列表,如何使用100个goroutine进行并发访问。模型主要约束为只能使用限定数量的goroutine实现并发,伪代码如下:
func send(url string) interface{} {
log.Printf("handler: %s", url)
return nil
}
func worker(ctx context.Context, url <-chan string, wg *sync.WaitGroup) (resp interface{}) {
for {
select {
case <-ctx.Done():
return nil
case u := <-url:
func() {
defer wg.Done()
send(u)
}()
}
}
}
func start(urls []string) {
var (
goNum = 100
buffer = make(chan string, goNum)
wg = &sync.WaitGroup{}
)
wg.Add(len(urls))
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < goNum; i++ {
go worker(ctx, buffer, wg)
}
for _, url := range urls {
buffer <- url
}
wg.Wait()
cancel()
close(buffer)
}
start
函数中定义了一个缓冲区长度为指定goroutine数量(goNum)的channel(buffer),并启动了goNum数量的常驻worker协程,以接收buffer
的输入。worker
通过for-select
实现对buffer
数据的持续处理。start
通过for
循环持续将待处理url写入buffer
中,buffer
未满时可一直输入,当buffer
缓冲区满后,生产者会阻塞在buffer <- url
语句,直至有空闲worker出现。这样就保证不会有超过goNum
的goroutine出现。对系统资源起到一定程度的保护。
start
中通过wait group
保证最后一个worker处理完成,之后分别调用cancel
和close
关闭worker
和buffer
,实现对了内存资源(goroutine
和chan
)的清理,避免内存泄露。