从一个消费慢的例子深入理解 kafka rebalance
共 8181字,需浏览 17分钟
·
2022-02-21 21:24
点击关注公众号,Java干货及时送达👇
文章来源:https://lxkaka.wang/kafka-rebalance/
前 言
我们能清楚的看到整个消费组在消费异常的时间段内经常出现消费停滞的情况如图上消费速率为0。
为什么消费会卡主呢?同时去看了相关服务的日志看到很多err kafka data maybe rebalancing。看了这篇文章后消费卡主的问题自然就知道答案了。
重要概念
为了说清楚 rebalance 有必要把最相关的重要概念回顾一下
| Consumer Group
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。
1. consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
2. group.id是一个字符串,唯一标识一个consumer group
3. consumer group订阅的topic下的每个分区只能分配给某个group下的一个consumer (当然该分区还可以被分配给其他group)
| Coordinator
partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode()%groupMetadataTopicPartitionCount)
Rebalance 目的
我们知道topic的partition已经根据策略分配给了consumer group下的各个consumer。
那么当有新的consumer加入或者老的consumer离开这个partition与consumer的分配关系就会发生变化,如果这个时候不进行重新调配,就可能出现新consumer无partition消费或者有partition无消费者的情况。
那么这个重新调配指的就是consumer和partition rebalance。
Rebalance 时机
Rebalance在以下情况会触发
Rebalance 过程
kafka中的重要设计也会随着版本的升级而优化。rebalance也不例外,这里我们介绍的kafka rebalance流程以我们的线上版本1.1.1为例。
Rebalance 问题
Rebalance 改进
| Static Membership
这样的话,在使用Static Membership场景下,只要在consumer重新启动的时候,不发送LeaveGroup Request且在session.timeout.ms时长内重启成功,就不会触发rebalance。所以,这里推荐设置一个足够consumer重启的时长session.timeout.ms,这样能有效降低因consumer短暂不可用导致的reblance次数。
| Incremental Cooperative Rebalancing
从名字中我们就能看出这个版本的rebalance过程两个关键词增量和协作,增量指的是原先版本的rebalance被分解成了多次小规模的rebalance, 协作自然指的是consumer 之间的关系。核心思想:
1. consumer比较新旧两个partition分配结果,只停止消费回收(revoke)的partition,对于两次都分配给自己的partition,consumer不需要停止消费
2. 通过多轮的局部 rebalance 来最终实现全局的 rebalance
我们以文章开始的例子来理解一下这个版本的改进
首先C1 -> {P0, p3}C2 -> {P1} C3 -> {P2}这是consumer和partition的分配关系,我们假设C2宕机超过了session.timeout.ms, 此时GroupCoordinator会触发第一轮 rebalance
| 第一轮 Rebalance
1. GroupCoordinator会在下一轮心跳响应中通知C1和C3发起第一轮rebalance
2. C1和C3会将自己当前正在处理的partition信息封装到JoinGroupRequest中(metadata字段)发往GroupCoordinator:
- C1 发送的 JoinGroupRequest(assigned: P0、P3)
- C3 发送的 JoinGroupRequest(assigned: P2)
3. 假设GroupCoordinator在这里选择1作为Group Leader,GroupCoordinator会将 partition目前的分配状态通过JoinGroupResponse发送给C1
4. C1发现P1并未出现(处于lost状态),此时C1并不会立即解决当前的不平衡问题,返回的partition分配结果不变(同时会携带一个delay时间,scheduled.rebalance.max.delay.ms,默认5分钟)。
GroupCoordinator据C1的SyncGroupRequest,生成SyncGroupResponse返回给两个存活的consumer
- C1 收到的SyncGroup Response(delay,assigned: P0、P3,revoked:)
- C3 收到的SyncGroup Response(delay,assigned: P2,revoked:)
到此为止,第一轮rebalance结束。整个rebalance过程中,C1和C3并不会停止消费。
| 第二轮 Rebalance
1. 在scheduled.rebalance.max.delay.ms这个时间段内,C2故障恢复,重新加入到 consumer group时,会向GroupCoordinator发送JoinGroup Request,触发第二轮的rebalance。
GroupCoordinator在下一次心跳响应中会通知C1和C3参与第二轮rebalance
2. C1和C3收到心跳后,发送JoinGroupRequest参与第二轮rebalance:
- C1 发送的JoinGroupRequest(assigned: P0、P3)
- C3 发送的JoinGroupRequest(assigned: P2)
3. 本轮中,C1依旧被选为Group Leader,它检查delay时间(scheduled.rebalance.max.delay.ms)是否已经到了,如果没到,则依旧不会立即解决当前的不平衡问题,继续返回目前的分配结果,并且返回的SyncGroupResponse 中更新了delay的剩余时间(remaining delay = delay - pass_time) 到此为止,第二轮 rebalance结束。
整个rebalance过程中,C1和C3并不会停止消费。
- C1收到的SyncGroup Response(remaining delay,assigned:P0、P3,revoked:)
- C2收到的SyncGroup Response(remaining delay,assigned:,revoked:)
- C3收到的SyncGroup Response(remaining delay,assigned:P2,revoked:)
| 第三轮 Rebalance
1. 当remaining delay时间到期之后,consumer全部重新送JoinGroupRequest,触发第三轮rebalance
- C1 发送的JoinGroupRequest(assigned: P0、P3)
- C2 发送的JoinGroupRequest(assigned: )
- C3 发送的JoinGroupRequest(assigned: P2)
2. 在此次rebalance中,C1依旧被选为Group Leader,它会发现delay已经到期了,开始解决不平衡的问题,对partition进行重新分配。最新的分配结果最终通过SyncGroupResponse返回到各个consumer:
到此为止,第三轮rebalance结束。整个rebalance过程中,C1和C3的消费都不会停止
- C1收到的SyncGroup Response(assigned:P0、P3,revoked:)
- C2收到的SyncGroup Response(assigned:P1,revoked:)
- C3收到的SyncGroup Response(assigned:P2,revoked:)
下面这张图展示了上述的Rebalance过程
1. JetBrains 宣布:IntelliJ 平台彻底停用 Log4j 组件,建议切换至 java.util.logging
最近面试BAT,整理一份面试资料《Java面试BATJ通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。
获取方式:点“在看”,关注公众号并回复 Java 领取,更多内容陆续奉上。
PS:因公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。
点“在看”支持小哈呀,谢谢啦😀