又超时了!Etcd分布式锁你用明白了吗?

共 9693字,需浏览 20分钟

 ·

2021-03-26 11:13


现象

线上程序报错,错误信息:lock failed: context deadline exceeded, retry

问题排查

很明显的是获取锁超时了,由于用的 etcd 的分布式锁,可能是 etcd 出问题了,此时看到大量 etcd 日志,rejected connection from "ip:port" (error "tls: first record does not look like a TLS handshake", ServerName ""),怀疑是不是这个问题导致的,经过查询报错的 IP,均为线上容器 IP,登陆容器内看发现都是管理员平台的代码,没报错的集群也有相同的日志,这不是导致超时的原因。在排除各种可能之后,最后去 etcd 查看锁对应的 key 的情况,发现有两个 key:

/notifier/locker/{leaseid}

/notifier/locker/rwl/{leaseid}

其中第一个 key 是 notifier 自己添加的,第二个 key 在代码中搜不到,但是看起来像是 redis whitelist 的简写,先把第一个 key 删了,然后看 notifier 日志,仍然获取不到锁,所以怀疑是第二个 key 已经获得了锁,虽然 key 不一样。于是删除了第二个 key,再看 notifier 日志,终于获得了锁,开始正常工作,于是得出猜想,etcd 的分布式锁,在子目录下加了锁之后,父目录会加锁失败。然后用 etcdctl lock 来验证了下,确实如此,/a/b 下加了锁,/a 再加锁就会失败,但是/a 下加了锁,/a/b 再加锁会成功。基本上可以验证上面的猜想,剩下的就是从 etcd 源码中找到对应处理的代码了。

etcd 源码部分

在查询源码之前,第一反应就是这肯定是在服务端实现的,于是开始了从 etcd 服务端找相关源码的过程,从 etcdctl 命令开始追溯到所涉及的服务端,一直没有发现问题。又在网上搜了相关 etcd 服务端源码实现的文章,结合本地代码均没有想找的代码,于是反过来从 client 找起。

首先从 etcdctl lock 命令开始,选择主要函数展示

// 代码位置go.etcd.io/etcd/etcdctl/ctlv3/command/lock_command.go
func lockUntilSignal(c *clientv3.Client, lockname string, cmdArgs []string) error {
   ...

   if err := m.Lock(ctx); err != nil {
      return err
   }

   ...
}

接下来进入到 Lock 函数,这是个关键函数,etcd 的分布式锁就是在这里实现的

func (m *Mutex) Lock(ctx context.Context) error {
   s := m.s
   client := m.s.Client()

   // 这里的pfx就是prefix,就是传进来的前缀,后面的s.Lease()会返回一个租约,是一个int64的整数,和session有关
   m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
   // 这里比较上面prefix/lease的createrevision是否为0,为0表示目前不存在该key,需要执行Put操作,下面可以看到
   // 不为0表示已经有对应的key了,只需要执行Get就行
   // createrevision是自增的
   cmp := v3.Compare(v3.CreateRevision(m.myKey), "="0)
   // put self in lock waiters via myKey; oldest waiter holds lock
   put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
   // reuse key in case this session already holds the lock
   get := v3.OpGet(m.myKey)
   // 获取所得持有者
   getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
   resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
   if err != nil {
      return err
   }
   m.myRev = resp.Header.Revision
   if !resp.Succeeded {
      m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
   }
   // if no key on prefix / the minimum rev is key, already hold the lock
   ownerKey := resp.Responses[1].GetResponseRange().Kvs
   // 比较如果当前没有人获得锁或者锁的owner的createrevision等于当前的kv的revision,则表示已获得锁,就可以退出了
   if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
      m.hdr = resp.Header
      return nil
   }

   // 为了验证自己加的打印信息
   //fmt.Printf("ownerKey: %s\n", ownerKey)
   // 走到这里代表没有获得锁,需要等待之前的锁被释放,即revision小于当前revision的kv被删除
   hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
   // release lock key if wait failed
   if werr != nil {
      m.Unlock(client.Ctx())
   } else {
      m.hdr = hdr
   }
   return werr
}

// waitDeletes 等待所有当前比当前key的revision小的key被删除后,锁释放后才返回
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
   getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
   for {
      resp, err := client.Get(ctx, pfx, getOpts...)
      if err != nil {
         return nil, err
      }
      if len(resp.Kvs) == 0 {
         return resp.Header, nil
      }
      lastKey := string(resp.Kvs[0].Key)
      // 为了调试自己加的这句
      fmt.Printf("wait for %s to delete\n", lastKey)
      if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
         return nil, err
      }
   }
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
   cctx, cancel := context.WithCancel(ctx)
   defer cancel()

   var wr v3.WatchResponse
   // wch是个channel,key被删除后会往这个chan发数据
   wch := client.Watch(cctx, key, v3.WithRev(rev))
   for wr = range wch {
      for _, ev := range wr.Events {
         if ev.Type == mvccpb.DELETE {
            return nil
         }
      }
   }
   if err := wr.Err(); err != nil {
      return err
   }
   if err := ctx.Err(); err != nil {
      return err
   }
   return fmt.Errorf("lost watcher waiting for delete")
}

看完上面的代码基本知道了 etcd 分布式锁的实现机制了,但是还没看到哪里和前缀 Prefix 相关了。其实答案就藏在 getOwner 里,看上述代码,不管是执行 Put 还是 Get,最终都有个 getOwner 的过程,看一下这个 getOwner,options 模式里有个 v3.WithFirstCreate 函数调用,看下这个函数

// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }

// withTop gets the first key over the get's prefix given a sort order
func withTop(target SortTarget, order SortOrder) []OpOption {
   return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
}

// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
func WithPrefix() OpOption {
   return func(op *Op) {
      if len(op.key) == 0 {
         op.key, op.end = []byte{0}, []byte{0}
         return
      }
      op.end = getPrefix(op.key)
   }
}

看到上面的是三个函数后,大致就找到了对应的源码的感觉,因为看到了 WithPrefix 函数,和上面的猜测正好匹配。所以 getOwner 的具体执行效果是会把所有以 lockkey 开头的 kv 都拿到,且按照 createrevision 升序排列,取第一个值,这个意思就很明白了,就是要拿到当前以 lockkey 为 prefix 的且 createrevision 最小的那个 key,就是目前已经拿到锁的 key。

看了上面的源码就可以明白为什么/a/b 加了锁之后,/a 加锁会超时了,因为在 getOwner 时,拿到了/a/b,且 createrevision 小于/a 的 revision,于是/a 就会等待/a/b 被删除后,watch chanel 有数据后才能获得锁。

看到这里还有个需要确认的问题,那就是如果/ab 加锁了,那么再对/a 加锁会怎么样?/a 肯定是/ab 的 prefix 啊,是不是也会加锁失败呢?

结论是会加锁成功,看下源码

func NewMutex(s *Session, pfx string) *Mutex {
   return &Mutex{s, pfx + "/"""-1nil}
}

可以看到在 NewMutex 时并不是直接拿传进来的 pfx 作为 prefix 的,而且在后面加了个"/",所以/ab 加了锁,/a 加锁还是可以成功的。一般查找 prefix 或 suffix 时都会加上固定的分隔符,要不然就会出现误判。

总结

通过分析问题,看源码,可以了解到 etcd 锁的实现原理,以及可能存在的小坑。Etcd 把锁的实现放在了 client 端,这样的话,可以直接修改 client 端代码来修改其锁的实现,或者使用不同版本的 etcd client 时就可能出现虽然共用一个服务端,但是 etcd 锁行为却不一致的问题,不过又有谁会闲着没事这么玩呢。这里也是提醒大家注意一下此类问题,万一以后碰到了可以快速定位解决。



你可能还喜欢

点击下方图片即可阅读

真香!使用 Goland 网页版实现真正的云开发

云原生是一种信仰 🤘


关注公众号

后台回复◉k8s◉获取史上最方便快捷的 Kubernetes 高可用部署工具,只需一条命令,连 ssh 都不需要!



点击 "阅读原文" 获取更好的阅读体验!


发现朋友圈变“安静”了吗?

浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报