聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践

共 2706字,需浏览 6分钟

 ·

2022-05-10 09:34

一、前言

大家好,我是老周,有快二十多天没有更新文章了,很多小伙伴一直在催更。先说明下最近的情况,最近项目上线很忙,没有时间写,并且组里有个同事使用 Kafka 不当,导致线上消息丢失,在修复一些线上的数据,人都麻了。事情是这样,有个 Kafka 消费者实例,部署到线上去,消费到了线上的数据,而新版本做了新的逻辑,新版本的业务逻辑与老版本的业务逻辑不兼容,直接导致消费失败,没有进行重试操作,关键还提交了 offset。直接这部分数据没有被业务处理,导致消息丢失,然后紧急修复线上数据。

刚好这些天忙完了有空,所以记录一下,同时看是否对大家能起到避免踩坑的作用,能有一些作用,那我写的也就值了。

我们下面会从以下三个方面来说一下 Kafka 消息丢失的场景以及最佳实践。

  • 生产者丢失消息

  • Kafka Broker 服务端丢失消息

  • 消费者丢失消息

二、Kafka 的三种消息语义

先说 Kafka 消息丢失的场景之前,我们先来说下 Kafka 的三种消息语义,不会还有人不知道吧?这个不应该了,消息系统基本上抽象成这以下三种消息语义了:

  • 最多传递一次

  • 最少传递一次

  • 仅有一次传递

三、Kafka 消息丢失的场景

3.1 生产者丢失消息

  • 目前 Kafka Producer 是异步发送消息的,如果你的 Producer 客户端使用了 producer.send(msg) 方法来发送消息,方法会立即返回,但此时并不能代表消息已经发送成功了。

  • 如果消息在发送的过程中发生了网络抖动,那么消息可能没有传递到 Broker,那么消息可能会丢失。

  • 如果发送的消息本身不符合,如大小超过了 Broker 的承受能力等。

3.2 Kafka Broker 服务端丢失消息

  • Leader Broker 宕机了,触发选举过程,集群选举了一个落后 Leader 太多的 Broker 作为 Leader,那么落后的那些消息就会丢失了。

  • Kafka 为了提升性能,使用页缓存机制,将消息写入页缓存而非直接持久化至磁盘,采用了异步批量刷盘机制,也就是说,按照一定的消息量和时间间隔去刷盘,刷盘的动作由操作系统来调度的,如果刷盘之前,Broker 宕机了,重启后在页缓存的这部分消息则会丢失。

3.3 消费者丢失消息

  • 消费者拉取了消息,并处理了消息,但处理消息异常了导致失败,并且提交了偏移量,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,消费失败的那些消息不会再次处理,即相当于消费者丢失了消息。

  • 消费者拉取了消息,并提交了消费位移,但是在消息处理结束之前突然发生了宕机等故障,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,之前未处理完成的消息不会再次处理,即相当于消费者丢失了消息。

四、最佳实践

4.1 生产端

  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。带有回调通知的 send 方法可以针对发送失败的消息进行重试处理。

  • acks = all代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。


  • 设置 retries = 3,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。



    如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。

  • 设置 retry.backoff.ms = 300,合理估算重试的时间间隔,可以避免无效的频繁重试。



    它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retriesretry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

4.2 Broker 端

  • 设置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。



  • 设置 replication.factor >= 3。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。



  • 设置 min.insync.replicas > 1。这控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。



  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

4.3 消费端

  • 确保消息消费完成再提交。最好把它设置成 enable.auto.commit = false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。



    虽然采用手动提交位移的方式可以解决消费端消息丢失的场景,但同时会存在重复消费问题,关于重复消费问题我们下一篇再讲。

  • 像我们上面说的那个线上问题,即使你设置了手动提交,消费异常了同时也提交了位移,还是会存在消息丢失。

    Kafka 没有重试机制不支持消息重试,也没有死信队列,因此使用 Kafka 做消息队列时,需要自己
    实现消息重试的功能。这里我先说下大致的思路,后续有时间再分享代码出来:

    • 创建一个 Topic 作为重试 Topic,用于接收等待重试的消息。

    • 普通 Topic 消费者设置待重试消息的下一个重试 Topic。

    • 从重试 Topic 获取待重试消息存储到 Redis 的 ZSet 中,并以下一次消费时间排序。

    • 定时任务从 Redis 获取到达消费时间的消息,并把消息发送到对应的 Topic。

    • 同一个消息重试次数过多则不再重试。


欢迎大家Java栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

浏览 65
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报