使用fan-out模型进行并发

go编程中可通过go func()直接启动一个gouroutine进行并发操作。go语言一句很有名的经典语是不要通过共享内存来通信,而是通过通信来共享内存,此处的共享内存通常情况下指的都是channel。本文介绍如何使用fan模式实现go的并发控制,以达到控制goroutine数量、从同一数据源读取数据并发处理的目的。

fan并发模式

基础的fan并发模式有fan-infan-out两种,前者收敛,后者张开,主要定义如下:

  1. fan-in: 1个goroutine从多个通道读取数据,直到通道关闭。是一种收敛模式,又被称为扇入,多用于收集处理结果。
  2. fan-out: 多个goroutine从1个通道读取数据,直到通道关闭。是一种张开模式,又被称为扇出,多用于分发任务。

fan-out编程模型

了解fan模式定义后,我们将实现一个基于fan-out实现的并发模型。代码主要场景为给定一个url列表,如何使用100个goroutine进行并发访问。模型主要约束为只能使用限定数量的goroutine实现并发,伪代码如下:

func send(url string) interface{} {
	log.Printf("handler: %s", url)
	return nil
}

func worker(ctx context.Context, url <-chan string, wg *sync.WaitGroup) (resp interface{}) {
	for {
		select {
		case <-ctx.Done():
			return nil
		case u := <-url:
			func() {
				defer wg.Done()
				send(u)
			}()
		}
	}
}

func start(urls []string) {
	var (
		goNum = 100
		buffer = make(chan string, goNum)
		wg = &sync.WaitGroup{}
	)
	wg.Add(len(urls))
	ctx, cancel := context.WithCancel(context.Background())

	for i := 0; i < goNum; i++ {
		go worker(ctx, buffer, wg)
	}
	for _, url := range urls {
		buffer <- url
	}
	wg.Wait()
	cancel()
	close(buffer)
}

start函数中定义了一个缓冲区长度为指定goroutine数量(goNum)的channel(buffer),并启动了goNum数量的常驻worker协程,以接收buffer的输入。worker通过for-select实现对buffer数据的持续处理。start通过for循环持续将待处理url写入buffer中,buffer未满时可一直输入,当buffer缓冲区满后,生产者会阻塞在buffer <- url语句,直至有空闲worker出现。这样就保证不会有超过goNum的goroutine出现。对系统资源起到一定程度的保护。

start中通过wait group保证最后一个worker处理完成,之后分别调用cancelclose关闭workerbuffer,实现对了内存资源(goroutinechan)的清理,避免内存泄露。

参考

  1. Golang并发模型:轻松入门流水线FAN模式