聊聊 Kafka:如何避免消费组的 Rebalance

共 5026字,需浏览 11分钟

 ·

2022-01-18 19:12

一、前言

我们上一篇聊了 Rebalance 机制,相信你对消费组的重平衡有个整体的认识。这里再简单回顾一下,Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,Consumer Group 下所有的 Consumer 实例共同参与,在 Coordinator 协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。像不像 JVM 的 GC?我们知道 JVM 频繁 GC 的话,对时延敏感的业务来说,简直是噩梦,所以我们会针对 GC 进行相应的调优,让 JVM 不那么频繁的发生 STW。

Kafka 消费组的重平衡也是类似的,消费组发生重平衡,Consumer 就很慢,对于实时性不敏感的业务,慢一点也能接受,就怕 Consumer 处理业务超时了,消费组把 Consumer 踢出去了,业务设置重试机制,自动从线程池中拿出一个新线程作为消费者去订阅 topic,那么意味着有新消费者加入 Consumer Group,又会引发 Rebalance,新的消费者还是来不及处理完所有消息,又被移出 Consumer Group。如此循环,就发生了频繁的 Rebalance 现象。

二、Rebalance 的弊端

  • Rebalance 影响 Consumer 端 TPS,Coordinator 协调者组件完成订阅主题分区的分配的过程,该消费组下所有实例都不能消费任何消息。

  • 如果你的组成员消费者实例很多的话,Rebalance 很慢,对业务会造成一定的影响。

  • Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Consumer Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。

对于第三点,你是不是觉得,Kafka 社区让所有成员都要参与进来很不合理啊,应该把那个退出消费组的消费者负责的分区随机分配给其它消费者,其它消费者的分区分配策略不变,这样就最大限度地减少 Rebalance 对剩余 Consumer 成员的冲击。

没错,你想到的社区也想到了,社区于 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。不过有些遗憾的是,这个策略目前还有一些 bug,而且需要升级到 0.11.0.0 才能使用,因此在实际生产环境中用得还不是很多。

你可能会问了,社区对上面的弊端有没有什么解决办法?没有,特别是 Rebalance 慢这个问题,Kafka 社区对此无能为力。设计就是这样的话,那我们是不是可以尽可能去规避 Rebalance 呢,特别是那些不必要的 Rebalance。

三、触发 Rebalance 机制的时机

要避免 Rebalance,还是要从触发 Rebalance 机制的时机入手。我们在前面说过,触发 Rebalance 机制的时机主要有以下几个:

  • 有新的 Consumer 加入 Consumer Group

  • 有 Consumer 宕机下线。Consumer 并不一定需要真正下线,例如遇到长时间的 GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送 HeartbeatRequest 时,GroupCoordinator 会认为 Consumer 下线。

  • 有 Consumer 主动退出 Consumer Group(发送 LeaveGroupRequest 请求)。比如客户端调用了 unsubscribe() 方法取消对某些主题的订阅。

  • Consumer 消费超时,没有在指定时间内提交 offset 偏移量。

  • Consumer Group 所对应的 GroupCoordinator 节点发生了变更。

  • Consumer Group 所订阅的任一主题或者主题的分区数量发生变化。

四、Rebalance 实战

有点抽象是不是?没有关系,下面来看个例子,老周简单来模拟一下 Rebalance。

4.1 生产者

/**
 * @author: 微信公众号【老周聊架构】
 */

public class KafkaProducerRebalanceTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        KafkaProducer producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord<>("topic_test""userName""riemann_" + i);
            producer.send(record);
        }
        producer.close();
    }
}

4.2 消费者

/**
 * @author: 微信公众号【老周聊架构】
 */

public class KafkaConsumerRebalanceTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_test");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        String topic = "topic_test";
        long pollTimeout = 100;
        long sleep = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);

        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection collection) {
                System.out.println("Partition Revoked");
            }

            @Override
            public void onPartitionsAssigned(Collection collection) {
                System.out.println("New assignment : " + collection.size() + " partitions");
            }
        });

        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeout));
            for (ConsumerRecord record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {
                    Thread.sleep(sleep);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4.3 终端日志

消费者日志

[Consumer clientId=consumer-consumer_group_test-1, groupId=consumer_group_test] Member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 sending LeaveGroup request to coordinator 127.0.0.1:9092 (id: 2147483643 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)


服务端日志

[2022-01-16 16:38:25,315] INFO [GroupCoordinator 4]: Member[group.instance.id None, member.id consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061] in group consumer_group_test has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2022-01-16 16:38:25,315] INFO [GroupCoordinator 4]: Preparing to rebalance group consumer_group_test in state PreparingRebalance with old generation 37 (__consumer_offsets-17) (reason: removing member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2022-01-16 16:38:25,316] INFO [GroupCoordinator 4]: Group consumer_group_test with generation 38 is now empty (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)


那消费组为啥会出现 Rebalance 呢?从消费者日志可以看出,消费超时,导致消费线程长时间无法向 Coordinator 节点发送心跳,Coordinator 节点以为 Consumer 已经宕机,Coordinator 于是将 Consumer 节点从消费组中剔除,并触发了 Rebalance 机制。

这其实和 Consumer 的心跳发送机制也有关系,在大多数中间件的设计中,都会分离业务线程和心跳发送线程,而 Kafka 却没有这样做,其目的可能是为了实现简单。如果消费者消费业务确实需要非常长时间,我们可以通过参数 max.poll.interval.ms 配置,它代表消费两次 poll 最大的时间间隔,默认是 300000 ms,也就是 5 分钟,5 分钟都还超时,那可以再调大一点;或者我们可以减少 consumer 每次从 broker 拉取的数据量,可以通过参数 max.poll.records 配置,consumer 默认拉取 500 条,我们可以将其修改了 200 条。

Kafka 在 0.10.1 版本中修正了 Consumer 的心跳发送机制,将心跳发送的任务交给了专门的 HeartbeatThread,而不是像早期版本那样依赖于用户应用线程来定期轮询。这个设计被证明是相当棘手的调整,增加会话超时将为消息处理提供更多的时间,但消费者组也将花更多的时间来检测进程崩溃等故障。Kafka 消费者 0.10.1 引入了 max.poll.interval.ms 来解耦处理超时和会话超时。这个 max.poll.interval.ms 参数还是很有意义的,因为即使心跳发送正常,那也只能证明 Consumer 是存活状态,但是 Consumer 可能处于假死状态,比如 Consumer 遇到了死锁导致长时间等待超过了poll 设定的时间间隔 max.poll.interval.ms

五、Rebalance 问题处理思路

我们上面第三点说的触发 Rebalance 机制的时机有好几点,其实主要就三大类:

  • 组成员数量发生变化

  • 订阅主题数量发生变化

  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,我们主要来说说不必要的 Rebalance 该如何避免,也就是组成员数量变化而引发的 Rebalance 该如何避免。

如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。

Consumer 实例增加的情况很好理解,当我们启动一个配置有相同 group.id 值的 Consumer 程序时,实际上就向这个 Consumer Group 添加了一个新的 Consumer 实例。此时,Coordinator 会接纳这个新实例,将其加入到组中,并重新分配分区。通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类 “不必要 Rebalance”。

我们更在意的是 Consumer Group 下实例数减少这件事。如果你就是要停掉某些 Consumer 实例,那自不必说,关键是在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被 “踢出” Consumer Group。如果是这个原因导致的 Rebalance,我们就不能不管了。

Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?那就要来看看消费者端配置的几个参数:

  • session.timeout.ms 设置了超时时间

  • heartbeat.interval.ms 心跳时间间隔

  • max.poll.interval.ms 每次消费的处理时间

  • max.poll.records 每次消费的消息数

5.1 session.timeout.ms

Consumer 与 Broker 的心跳超时时间,默认 10s,Broker 如果超过 session.timeout.ms 设定的值仍然没有收到心跳,Broker 端将会将该消费者移除,并触发 Rebalance。

这个值必须设置在 Broker 配置中的 group.min.session.timeout.ms 与  group.max.session.timeout.ms 之间。

该参数和 heartbeat.interval.ms 这两个参数可以适当的控制 Rebalance 的频率。

5.2 heartbeat.interval.ms

心跳间隔时间。心跳是在 Consumer 与 Coordinator 之间进行的。心跳用来保持 Consumer 的会话,并且在有 Consumer 加入或者离开 Consumer Group 时帮助进行 Rebalance。

这个值必须设置的小于 session.timeout.ms,因为:当 Consumer 由于某种原因不能发 Heartbeat 到 Coordinator 时,并且时间超过 session.timeout.ms 时,就会认为该 Consumer 已退出,它所订阅的 Partition 会分配到同一 Consumer Group 内的其它的 Consumer 上。

通常设置的值要低于 session.timeout.ms1/3。默认值是:3s

5.3 max.poll.interval.ms

两次 poll 方法调用的最大间隔时间,单位毫秒,默认为 5 分钟。如果消费端在该间隔内没有发起 poll 操作,该消费者将被剔除,触发重平衡,将该消费者分配的队列分配给其他消费者。

Kafka 中有一个专门的心跳线程来实现发送心跳的动作,所以存在 Consumer Client 依旧可以有效的发送心跳,但 Consumer 实际却处于 livelock (活锁)状态,从而导致无法有效的进行数据处理,所以基于此 Kafka 通过参数 max.poll.interval.ms 来规避该问题。

5.4 max.poll.records

Consumer 每次调用 poll() 时取到的 records 的最大数。每执行一次 poll 方法所拉取的最大数据量;是基于所分配的所有 Partition 而言的数据总和,而非每个 Partition 上拉去的最大数据量;默认值为 500

通俗点讲表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。

六、如何避免 Rebalance

简单来说,非必要 Rebalance 有下面两个点:

  • 消费者心跳超时,导致 Rebalance。

  • 消费者处理时间过长,导致 Rebalance。

6.1 消费者心跳超时

我们知道消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 Rebalance。

这里给一下业界主流推荐的值,可以根据自己的业务可做相应的调整:

  • 设置 session.timeout.ms = 6s

  • 设置 heartbeat.interval.ms = 2s

  • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即  session.timeout.ms >= 3 * heartbeat.interval.ms

这里你可能会问了,为啥要 session.timeout.ms >= 3 * heartbeat.interval.ms,而不是 5 或者 10 呢?

session.timeout.ms 是个"逻辑"指标,比如它指定了一个阈值 6 秒,在这个阈值内如果 Coordinator 未收到 Consumer 的任何消息,那 Coordinator 就认为 Consumer 挂了。而 heartbeat.interval.ms 是个"物理"指标,它告诉 Consumer 要每 2 秒给 Coordinator 发一个心跳包,heartbeat.interval.ms 越小,发的心跳包越多,它是会影响发 TCP 包的数量的,产生了实际的影响,这也是我为什么将之称为"物理"指标的原因。

如果 Coordinator 在一个 heartbeat.interval.ms 周期内未收到 Consumer 的心跳,就把该 Consumer 移出 Consumer Group,这有点说不过去。就好像 Consumer 犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能 Consumer 出现了一次长时间 GC,影响了心跳包的到达,说不定下一个 Heartbeat 就正常了。

heartbeat.interval.ms 肯定是要小于 session.timeout.ms 的,如果 Consumer Group 发生了 Rebalance,通过心跳包里面的 REBALANCE_IN_PROGRESS,Consumer 就能及时知道发生了 Rebalance,从而更新 Consumer 可消费的分区。而如果超过了 session.timeout.ms,Coordinator 都认为 Consumer 挂了,那也当然不用把  Rebalance 信息告诉该 Consumer 了。

Kafka 0.10.1 之后的版本中,将 session.timeout.msmax.poll.interval.ms 解耦了。也就是说:new KafkaConsumer 对象后,在 while true 循环中执行 consumer.poll 拉取消息这个过程中,其实背后是有 2 个线程的,即一个 Consumer 实例包含 2 个线程:一个是 Heartbeat 线程,另一个是 Processing 线程。Processing 线程可理解为调用 consumer.poll 方法执行消息处理逻辑的线程,而 Heartbeat 线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理 5 min,那么  max.poll.interval.ms 可设置成比 5 min 大一点的值。而 Heartbeat 线程则和上面提到的参数 heartbeat.interval.ms 有关,Heartbeat 线程每隔 heartbeat.interval.ms 向 Coordinator 发送一个心跳包,证明自己还活着。只要 Heartbeat 线程 在 session.timeout.ms 时间内向 Coordinator 发送过心跳包,那么   Coordinator 就认为当前的 Consumer 是活着的。

Kafka 0.10.1 之前,发送心跳包和消息处理逻辑这 2 个过程是耦合在一起的,试想:如果一条消息处理时长要 5 min,而 session.timeout.ms=3000ms,那么等 Consumer 处理完消息,Coordinator 早就将 Consumer 移出 Consumer Group 了,因为只有一个线程,在消息处理过程中就无法向 Coordinator 发送心跳包,超过 3000ms 未发送心跳包,Coordinator 就将该 Consumer 移出 Consumer Group 了。而将二者分开,一个 Processing 线程负责执行消息处理逻辑,一个 Heartbeat 线程负责发送心跳包。那么就算一条消息需要处理 5min,只要 Heartbeat 线程在 session.timeout.ms 时间内向 Coordinator 发送了心跳包,那 Consumer 可以继续处理消息,而不用担心被移出 Consumer Group 了。另一个好处是:如果 Consumer 出了问题,那么在 session.timeout.ms 内就能检测出来,而不用等到 max.poll.interval.ms 时长后才能检测出来。

为啥要 session.timeout.ms >= 3 * heartbeat.interval.ms,我觉得是社区做的测试得出来的最优值吧, 因为 heartbeat.interval.ms 越小,发的心跳包越频繁,浪费没必要的流量;而设置越大,Consumer 挂了很久才能检测到,明显也不合理。

6.2 消费者处理时间过长

如果消费者处理时间过长,那么同样会导致协调者认为该 Consumer 死亡了,从而发起重平衡。

而 Kafka 的消费者参数设置中,跟消费处理的两个参数为:

  • max.poll.interval.ms 每次消费的处理最大时间

  • max.poll.records 每次消费的消息数

对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值)。

我们上面的那个例子就是这个场景触发的 Rebalance,max.poll.interval.ms 每次消费的处理最大时间设置的是 60000ms,也就是 1min。而我在 consumer.poll 方法里休眠了 2min 来模拟处理业务的时间,处理业务的时间大于 max.poll.interval.ms ,导致 Rebalance。

6.3 Consumer 端的 GC 表现

如果上面两种从 Kafka 层面还无法避免 Rebalance,那我建议你去排查下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。



Java

浏览 117
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报