go.dev博客阅读-pipelines
2026/6/9 20:40:24 网站建设 项目流程

这篇文章 2014年3月13日发表,作者 Sameer Ajmani

通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等

一些开源类库也沿用了其思想,例如MapReduces、并行处理等

这篇博客要以MapReduces或者生产消费模型的思想去阅读

博客开头的示例

一个比较基础的管道使用

将一组整数通过管道依次平方,最终输出结果

// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81

过程中的一些说明

  1. gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回
  2. 声明 chan 类型后,要养成 close 的习惯,close 后依然可以读,有减缓 Gc 压力
  3. sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次
  4. gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)

这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程

并行处理

官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现

依旧是求平方的案例

// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81

说明:

  1. c1、c2 相当于2个消费任务去执行,通过内部创建的 goroutinue 去模型多线程多核并行
  2. merge 方法将多个传入的 chan 输出,合并到一个 chan,保证 Reduces 阶段只会有1个输出出口
  3. ❌这里面有个不严谨漏洞,当取数据不是采用 range 方式或者 chan 数据没有取完, chan 的发送方就会阻塞

带取消功能的 chan

并行处理的代码改进,在每个方法中都引入done

funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}
  1. 在每个方法中,都加入了done,内部使用select来监听是否关闭,并return 释放协程
  2. 如果chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏

但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放

如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力

// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()

额外注意的点

在多任务消费读取生产数据时

funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}

这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快

gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定

🧠🧠🧠🧠

对官方这篇博客,我的理解是

  1. 每个使用了 chan 的地方,应在适当的时候关闭且释放掉
  2. 每个使用了 chan 的地方应持续从输入 channel 读取,直到关闭或收到取消信号,而不是一口气读一口气写
  3. 不要完全依赖有缓冲的 chan 的 size 解决阻塞问题,缓冲的大小是一个容错作用
  4. 使用关闭的 channel 作为广播取消信号,通知所有上游 goroutine 停止工作。
  5. 使用 WaitGroup 时,务必确保所有任务完成后再关闭输出 channel,先 wait,再 close

原文出处 https://go.dev/blog/pipelines

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询