并发控制模式:orDone的两种实现

共 5002字,需浏览 11分钟

 ·

2021-09-07 12:34

文章目录

  • 方式一 递归

  • 方式二 利用反射

  • 性能差异

orDone 是一种并发控制模式,旨在多任务场景下实现,有一个任务成功返回即立即结束等待。

今天我们来看下两种不同的实现方式:

方式一 递归

利用二分法递归, 将所有待监听信号的chanselect起来,

当有第一个chan返回时,close orDone 来通知读取方已有第一个任务返回

代码如下比较直观:

// 传入多个并发chan,返回是否结束的 orDone chan
func Or(channels ...<-chan interface{}) <-chan interface{} {
 // 只有零个或者1个chan
 switch len(channels) {
 case 0:
        // 返回nil, 让读取阻塞等待
  return nil
 case 1:
  return channels[0]
 }

 orDone := make(chan interface{})
 go func() {
        // 返回时利用close做结束信号的广播
  defer close(orDone)

        // 利用select监听第一个chan的返回
  switch len(channels) {
  case 2// 直接select
   select {
   case <-channels[0]:
   case <-channels[1]:
   }
  default// 二分法递归处理
   m := len(channels) / 2
   select {
   case <-Or(channels[:m]...):
   case <-Or(channels[m:]...):
   }
  }
 }()

 return orDone
}

方式二 利用反射

这里要用到reflect.SelectCase, 他可以描述一种selectcase, 来指明其接受的是chan的读取或发送

type SelectCase struct {
 Dir  SelectDir // direction of case
 Chan Value     // channel to use (for send or receive)
 Send Value     // value to send (for send)
}

有了这个,就可以之间遍历,不用递归来实现有限的select case构造

最后用reflect.Select(cases)监听信号就可以了,代码如下:

func OrInReflect(channels ...<-chan interface{}) <-chan interface{} {
 // 只有0个或者1个
 switch len(channels) {
 case 0:
  return nil
 case 1:
  return channels[0]
 }

 orDone := make(chan interface{})
 go func() {
  defer close(orDone)
  // 利用反射构建SelectCase,这里是读取
  var cases []reflect.SelectCase
  for _, c := range channels {
   cases = append(cases, reflect.SelectCase{
    Dir:  reflect.SelectRecv,
    Chan: reflect.ValueOf(c),
   })
  }

  // 随机选择一个可用的case
  reflect.Select(cases)
 }()

 return orDone
}

性能差异

这两种都可以支持大量chan的信号监听,那性能差异大么

虽说递归开销肯定不小,反射也不一定效率高,拿个压测来试试吧

先构造一下大量并发chan

func repeat(
 done <-chan interface{},
    // 外部传入done控制是否结束
 values ...interface{},
)
 <-chan interface
{} {
 valueStream := make(chan interface{})
 go func() {
        // 返回时释放
  defer close(valueStream)
  for {
   for _, v := range values {
    select {
    case <-done:
     return
    case valueStream <- v:
    }
   }
  }
 }()
 return valueStream
}

然后压测

func BenchmarkOr(b *testing.B) {
 done := make(chan interface{})
 defer close(done)
 num := 100
 streams := make([]<-chan interface{}, num)
 for i := range streams {
  streams[i] = repeat(done, []int{123})
 }
 b.ResetTimer()
 for i := 0; i < b.N; i++ {
  <-Or(streams...)
 }
}

func BenchmarkOrInReflect(b *testing.B) {
 // 代码类似
}

跑了下结果如下:

goos: darwin
goarch: amd64
pkg: github.com/NewbMiao/Dig101-Go/concurrency/channel/schedule/orDone
BenchmarkOr-12                 31815      38136 ns/op     9551 B/op       99 allocs/op
BenchmarkOrInReflect-12        55797      21755 ns/op    25232 B/op      112 allocs/op

可以看出,大量并发chan场景下, 反射使用内存更多些,但速度更快。



推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

浏览 42
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报