别再为FusionCompute CNA安装发愁了!用VMware Workstation 16 Pro手把手带你搞定(附镜像下载)
2026/6/9 20:38:59
这篇文章 2014年3月13日发表,作者 Sameer Ajmani
通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等
一些开源类库也沿用了其思想,例如MapReduces、并行处理等
这篇博客要以MapReduces或者生产消费模型的思想去阅读
一个比较基础的管道使用
将一组整数通过管道依次平方,最终输出结果
// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}输出
/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81过程中的一些说明
gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程
官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现
依旧是求平方的案例
// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}输出
/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81说明:
对并行处理的代码改进,在每个方法中都引入done
funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}done,内部使用select来监听是否关闭,并return 释放协程chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放
如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力
// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()在多任务消费读取生产数据时
funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快
gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定
对官方这篇博客,我的理解是
原文出处 https://go.dev/blog/pipelines