Go:说说fanIn和fanOut模式
文章目录
fanIn
协程版
递归版
反射版
fanOut
同步版
协程异步版
反射版
今天回顾下常用的两种channel
应用模式: fanIn
和fanOut
,
分别对应了,对一组相同类型chan
的合并和广播。
fanIn
将全部输入chan
都聚合到一个out chan
中,在全部聚合完成后,关闭out chan
.
协程版
func fanIn(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
var wg sync.WaitGroup
wg.Add(len(chans))
for _, ch := range chans {
go func(ch <-chan interface{}) {
for v := range ch {
out <- v
}
wg.Done()
}(ch)
}
// 等待协程全部结束
wg.Wait()
close(out)
}()
return out
}
这里用waitGroup
是防止关闭out
时还有写入(out <- v
),避免panic
递归版
2 分递归并合并。
其中合并mergeTwo
主要用了nil chan
对读写均阻塞。
当chan
关闭时,设置为nil
,阻塞读取。
func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
// 无可聚合chan,返回一个已关闭chan,可读不可写
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
// 一分为二,递归
m := len(chans) / 2
return mergeTwo(
fanInRecur(chans[:m]...),
fanInRecur(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { // 只要还有可读的chan
select {
case v, ok := <-a:
if !ok { // a 已关闭,设置为nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已关闭,设置为nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
反射版
利用reflect.SelectCase
构造批量可Select
的发送chan
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// 构造SelectCase slice
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
// 循环,从cases中选择一个可用的
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
// 此channel已经close, 从切片移除
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}
附上压测数据
fanOut
同步版
最直观的方式,直接向每一个chan
都同步发送一遍
返回前关闭这组chan
, 即不再写入
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出时关闭所有的输出chan
for i := range out {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := range out {
i := i
out[i] <- v // 放入到输出chan中,同步方式
}
}
}()
}
协程异步版
发送这里用起协程的方式,实现异步,发送操作耗时情况下无需阻塞等待
可是有个问题,不知道你看出来没。
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出时关闭所有的输出chan
for i := range out {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := range out {
i := i
// 协程异步
go func(){}
out[i] <- v
}()
}
}
}()
}
乍一看好像没什么问题, 但退出时关闭时,很可能发送的协程写入还没完成,
毕竟这里out
之前写入的要有人读才能继续写。
这里加waitGroup
可以等待全部发送完毕在关闭
func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
go func() {
var wg sync.WaitGroup
defer func() { // 退出时关闭所有的输出chan
wg.Wait()
for i := range out {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := range out {
i := i
wg.Add(1)
go func() { // 异步,避免一个out阻塞的时候影响其他out
out[i] <- v
wg.Done()
}()
}
}
}()
}
反射版
构造一票chan send case
, 遍历select
,发送完成的将其置为nil
阻塞,避免再次发送
不得不说,nil chan
出镜率很高啊
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出时关闭所有的输出chan
for i := range out {
close(out[i])
}
}()
cases := make([]reflect.SelectCase, len(out))
// 构造SelectCase slice
for i := range cases {
cases[i].Dir = reflect.SelectSend
}
for v := range ch {
v := v
// 先完成send case构造
for i := range cases {
cases[i].Chan = reflect.ValueOf(out[i])
cases[i].Send = reflect.ValueOf(v)
}
// 遍历select
for range cases {
chosen, _, _ := reflect.Select(cases)
// 已发送过,用nil阻塞,避免再次发送
cases[chosen].Chan = reflect.ValueOf(nil)
}
}
}()
}
附上压测数据
具体测试代码详见:concurrency[1]
参考资料
concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule
推荐阅读
评论