你可能用错了 kafka 的重试机制
来源 | http://r6d.cn/b2u2p
Apache Kafka 已成为跨微服务异步通信的主流平台。它有很多强大的特性,让我们能够构建健壮、有弹性的异步架构。
同时,我们在使用它的过程中也需要小心很多潜在的陷阱。如果未能提前发现可能发生(换句话说就是迟早会发生)的问题,我们就要面对一个容易出错和损坏数据的系统了。
在本文中,我们将重点介绍其中的一个陷阱:尝试处理消息时遭遇失败。首先,我们需要意识到消息消费可能会,而且迟早会遭遇失败。其次,我们需要确保在处理此类故障时不会引入更多问题。
Kafka 简介
阅读本文的读者应该都对 Kafka 有所了解。网上也有一些介绍 Kafka 及其使用方法的深度文章。话虽如此,我们这里还是先简要回顾一下对我们的讨论很重要的一些概念。
事件日志、发布者和消费者
Kafka 是用来处理数据流的系统。从概念上讲,我们可以认为 Kafka 包含三个基本组件:
一个事件日志(Event Log),消息会发布到它这里 发布者(Publisher),将消息发布到事件日志 消费者(Consumer),消费(也就是使用)事件日志中的消息
与 RabbitMQ 之类的传统消息队列不同,Kafka 由消费者来决定何时读取消息(也就是说,Kafka 采用了拉取而非推送模式)。每条消息都有一个偏移量(offset),每个消费者都跟踪(或提交)其最近消费消息的偏移量。这样,消费者就可以通过这条消息的偏移量请求下一条消息。
主题
事件日志分为几个主题(topic),每个主题都定义了要发布给它的消息类型。定义主题是我们这些工程师的责任,所以我们应该记住一些经验法则:
每个主题都应描述一个其他服务可能需要了解的事件。 每个主题都应定义每条消息都将遵循的一个唯一模式(schema)。
分区和分区键
主题被进一步细分为多个分区(partition)。分区使消息可以被并行消费。Kafka 允许通过一个**分区键(partition key)**来确定性地将消息分配给各个分区。分区键是一段数据(通常是消息本身的某些属性,例如 ID),其上会应用一个算法以确定分区。
这里,我们将消息的 UUID 字段分配为分区键。生产者应用一种算法(例如按照分区数修改每个 UUID 值)来将每条消息分配给一个分区。
以这种方式使用分区键,使我们能够确保与给定 ID 关联的每条消息都会发布到单个分区上。
还需要注意的是,可以将一个消费者的多个实例部署为一个消费者组。Kafka 将确保给定分区中的任何消息将始终由组中的同一消费者实例读取。
在微服务中使用 Kafka
Kafka 非常强大。所以它可用于多种环境中,涵盖众多用例。在这里,我们将重点介绍微服务架构中最常见的用法。
跨有界上下文传递消息
当我们刚开始构建微服务时,我们许多人一开始采用的是某种中心化模式。每条数据都有一个驻留的单一微服务(即单一真实来源)。如果其他任何微服务需要访问这份数据,它将发起一个同步调用以检索它。
这种方法导致了许多问题,包括同步调用链较长、单点故障、团队自主权下降等。
最后我们找到了更好的办法。在今天的成熟架构中,我们将通信分为命令处理和事件处理。
命令处理通常在单个有界上下文中执行,并且往往还是会包含同步通信。
另一方面,事件通常由一个有界上下文中的服务发出,并异步发布到 Kafka,以供其他有界上下文中的服务消费。
左侧是我们以前设计微服务通信的方式:一个有界上下文(由虚线框表示)中的服务从其他有界上下文中的服务接收同步调用。右边是我们如今的做法:一个有界上下文中的服务发布事件,其他有界上下文中的服务在自己空闲时消费它们。
例如,以一个 User 有界上下文为例。我们的 User 团队会构建负责启用新用户、更新现有用户帐户等任务的应用程序和服务。
创建或修改用户帐户后,UserAccount 服务会将一个相应的事件发布到 Kafka。其他感兴趣的有界上下文可以消费该事件,将其存储在本地,使用其他数据增强它,等等。例如,我们的 Login 有界上下文可能想知道用户的当前名称,以便在登录时向他们致意。
我们将这种用例称为跨边界事件发布。
在执行跨边界事件发布时,我们应该发布聚合(Aggregate)。聚合是自包含的实体组,每个实体都被视为一个单独的原子实体。每个聚合都有一个“根”实体,以及一些提供附加数据的从属实体。
当管理聚合的服务发布一条消息时,该消息的负载将是一个聚合的某种表示形式(例如 JSON 或 Avro)。重要的是,该服务将指定聚合的唯一标识符作为分区键。这将确保对任何给定聚合实体的更改都将发布到同一分区。
出问题的时候怎么办?
尽管 Kafka 的跨边界事件发布机制显得相当优雅,但毕竟这是一个分布式系统,因此系统可能会有很多错误。我们将关注也许是最常见的恼人问题:消费者可能无法成功处理其消费的消息。
我们现在该怎么办?
确定这是一个问题
团队做错的第一件事就是根本没有意识到这是一个潜在的问题。消息失败时有发生,我们需要制定一种策略来处理它……要未雨绸缪,而非亡羊补牢。
因此,了解这是一种迟早会发生的问题并设计针对性的解决方案是我们要做的第一步。如果我们做到了这一点,就应该向自己表示一点祝贺。现在最大的问题仍然存在:我们该如何处理这种情况?
我们不能一直重试那条消息吗?
默认情况下,如果消费者没有成功消费一条消息(也就是说消费者无法提交当前偏移量),它将重试同一条消息。那么,难道我们不能简单地让这种默认行为接管一切,然后重试消息直到成功吗?
问题是这条消息可能永远不会成功。至少,没有某种形式的手动干预它是不会成功的。于是乎,消费者就永远不会继续处理后续的任何消息,并且我们的消息处理将陷入困境。
好吧,我们不能简单地跳过那条消息吗?
我们通常允许同步请求失败。例如,对我们的 UserAccount 服务所做的一个“create-user”POST 可能包含错误或丢失的数据。在这种情况下,我们可以简单地返回一个错误代码(例如 HTTP 400),然后要求调用方重试。
虽然这种办法并不不理想,但这不会对我们的数据完整性造成任何长期问题。那个 POST 代表一条命令,是还没有发生的事情。即使我们让它失败,我们的数据也将保持一致状态。
当我们丢弃消息时情况并非如此。消息表示已经发生的事件。任何忽略这些事件的消费者都将与生成事件的上游服务不再同步。
所有这些都表明,我们不想丢弃消息。
那么我们如何解决这个问题呢?
对我们来说这不是什么容易解决的问题。因此,一旦我们认识到它需要解决,就可以向互联网咨询解决方案。但这引出了我们的第二个问题:网上有一些我们可能不应该遵循的建议。
重试主题:流行的解决方案
你会发现最受欢迎的一种解决方案就是重试主题(retry topics)的概念。具体细节因实现而异,但总体概念是这样的:
消费者尝试消费主要主题中的一条消息。 如果未能正确消费该消息,则消费者将消息发布到第一个重试主题,然后提交消息的偏移量,以便继续处理下一条消息。 订阅重试主题的是重试消费者,它包含与主消费者相同的逻辑。该消费者在消息消费尝试之间引入了短暂的延迟。如果这个消费者也无法消费该消息,则会将该消息发布到另一个重试主题,并提交该消息的偏移量。 这一过程继续,并增加了一些重试主题和重试消费者,每个重试的延迟越来越多(用作退避策略)。最后,在最终重试消费者无法处理某条消息后,该消息将发布到一个死信队列(Dead Letter Queue,DLQ)中,工程团队将在该队列中对其进行手动分类。
概念上讲,重试主题模式定义了失败的消息将被分流到的多个主题。如果主要主题的消费者消费了它无法处理的消息,它会将该消息发布到重试主题 1 并提交当前偏移量,从而将自身释放给下一条消息。重试主题的消费者将是主消费者的副本,但如果它无法处理该消息,它将发布到一个新的重试主题。最终,如果最后一个重试消费者也无法处理该消息,它将把该消息发布到一个死信队列(DLQ)。
问题出在哪里?
看起来这种方法似乎很合理。实际上,它在许多用例中都能正常工作。问题在于它不能充当一种通用解决方案。现实中存在一些特殊用例(例如我们的跨边界事件发布),对于这些用例来说,这种方法实际上是危险的。
它忽略了不同类型的错误
第一个问题是,它没有考虑到导致事件消费失败的两大原因:可恢复错误和不可恢复错误。
可恢复错误指的是,如果我们多次重试,这些错误最终将得以解决。一个简单的示例是将数据保存到数据库的消费者。如果数据库暂时不可用,那么当下一条消息通过时,消费者将失败。一旦数据库再次变得可用,消费者就能够再次处理该消息。
从另一个角度来看:可恢复错误指的是那些根源在消息和消费者外部的错误。解决这种错误后,我们的消费者将继续前进,好像无事发生一样。(很多人在这里被弄糊涂了。“可恢复”一词并不意味着应用程序本身——在我们的示例中为消费者——可以恢复。相反,它指的是某些外部资源——在此示例中为数据库——会失败并最终恢复。)
关于可恢复错误需要注意的是,它们将困扰主题中的几乎每一条消息。回想一下,主题中的所有消息都应遵循相同的架构,并代表相同类型的数据。同样,我们的消费者将针对该主题的每个事件执行相同的操作。因此,如果消息 A 由于数据库中断而失败,那么消息 B、消息 C 等也将失败。
不可恢复错误指的是无论我们重试多少次都将失败的错误。例如,消息中缺少字段可能会导致一个 NullPointerException,或者包含特殊字符的字段可能会使消息无法解析。
与可恢复错误不同,不可恢复错误通常会影响单个孤立消息。例如,如果只有消息 A 包含不可解析的特殊字符,则消息 B 将成功,消息 C 等也将成功。
与可恢复错误不同,解决不可恢复错误意味着我们必须修复消费者本身(永远不要“修复”消息本身——它们是不可变的记录!)例如,我们可能会修复消费者以便正确处理空值,然后重新部署它。
那么,这与重试主题解决方案有什么关系?
对于初学者来说,它对可恢复错误不是特别有用。请记住,在解决外部问题之前,可恢复错误将影响每一条消息,而不仅仅是当前的一条消息。因此可以肯定的是,将失败的消息分流到重试主题将为下一条消息清理出通道。但接下来的消息也将失败,下一条以及再下一条也将失败。我们最好还是让消费者自己重试,直到问题解决为止。
不可恢复的错误呢?重试队列可以在这些情况下提供帮助。如果一条麻烦的消息阻止了所有后续消息的消费,那么毫无疑问,分流该消息肯定会为我们的用户消费清除障碍(当然,多个重试主题是没必要的)。
但是,虽然重试队列可以帮助受不可恢复错误困扰的消息消费者继续前进,但它也可能带来更多隐患。下面我们就进一步分析背后的原因。
它会忽略排序
我们简要回顾一下跨边界事件发布的一些重要环节。在有界上下文中处理一条命令后,我们会将一个对应的事件发布到一个 Kafka 主题。重要的是,我们会将聚合的 ID 指定为分区键。
为什么这很重要?它确保的是对任何给定聚合的更改都会发布到同一分区。
好吧,那这一点为什么会那么重要呢?当事件发布到同一分区时,可以保证各个事件按照它们发生的顺序进行处理。如果对同一聚合进行连续更改,并且所产生的事件发布到不同的分区,就可能发生争用状况,也就是消费者在消费第一个更改之前就消费了第二个更改。这会导致数据不一致。
我们举个简单的例子。我们的 User 有界上下文提供了一个允许用户更改其名称的应用程序。一位用户将他的名字从 Zoey 更改为 Zoë,然后立即又更改为 Zoiee。如果我们不管排序,则某个下游消费者(例如 Login 有界上下文)可能会先处理对 Zoiee 的更改,然后不久用 Zoë覆盖它。
现在,登录数据与我们的用户数据已经不同步了。更麻烦的是,每当 Zoiee 登录我们的网站时都会看到“欢迎光临,Zoë!”的登录提示。
这才是重试主题真正出问题的地方。它们让我们的消费者容易打乱处理事件的顺序。如果一个消费者在处理 Zoë更改时受到某个临时的数据库中断的影响,它会把这个消息分流到一个重试主题,稍后再尝试。如果在 Zoiee 更改到达时数据库中断已得到纠正,则这条消息将先被成功处理,然后再由 Zoë更改覆盖。
为了说明问题,这里用了 Zoiee/Zoë这样一个简单的示例。实际上,乱序处理事件可能导致会各种各样的数据损坏问题。更糟糕的是,这些问题很少会在一开始就被注意到。相反,它们所导致的数据损坏往往在一段时间内都不会引起注意,但损坏程度会随着时间的推移而增长。一般来说,当我们意识到发生了什么事情时,已经有大量数据受到影响了。
重试主题什么时候可行?
需要明确的是,重试主题并非一直都是错误的模式。当然,它也存在一些合适的用例。具体来说,当消费者的工作是收集不可修改的记录时,这种模式就很不错。这样的例子可能包括:
处理网站活动流以生成报告的消费者 将交易添加到分类账的消费者(只要这些交易用不着按特定顺序跟踪) 正在从另一个数据源 ETL 数据的消费者
这类消费者可能会从重试主题模式中受益,同时没有数据损坏的风险。
不过,请注意
即使存在这种用例,我们仍应谨慎行事。构建这样的解决方案既复杂又耗时。因此,作为一个组织,我们不想为每个新的消费者编写一个新的解决方案。相反,我们要创建一个统一的解决方案,比如一个库或一个容器等,可以在各种服务之间重复使用。
还存在另一个问题。我们可能会为相关消费者构建一个重试主题的解决方案。不幸的是,不久之后,这个解决方案就会进入跨边界事件发布消费者的领域了。拥有这些消费者的团队可能没有意识到风险的存在。正如我们前面所讨论的那样,在发生重大数据损坏之前,他们可能不会意识到任何问题。
因此,在实现重试主题解决方案之前,我们应 100%确定:
我们的业务中永远不会有消费者来更新现有数据,或者 我们拥有严格的控制措施,以确保我们的重试主题解决方案不会在此类消费者中实现
我们如何改善这种模式?
鉴于重试主题模式可能不是跨边界事件发布消费者的可接受解决方案,我们是否可以对其做一些调整来改善它呢?
一开始,本文想要提供一种完整的解决方案。但之后我意识到,并不存在什么万能的路径。因此,我们将只讨论一些在制定合适解决方案时需要考虑的事项。
消除错误类型
如果我们能够在可恢复错误和不可恢复错误之间消除歧义,生活就会变得轻松许多。例如,如果我们的消费者开始遇到可恢复错误,那么重试主题就变得多余了。
因此,我们可以尝试确定所遇到的错误类型:
void processMessage(KafkaMessage km) {
try {
Message m = km.getMessage();
transformAndSave(m);
} catch (Throwable t) {
if (isRecoverable(t)) {
// ...
} else {
// ...
}
}
}
在上面的 Java 伪代码示例中,isRecoverable()将采用一种白名单方法来确定 t 是否表示可恢复错误。换句话说,它检查 t 以确定它是否与任何已知的可恢复错误(例如 SQL 连接错误或 ReST 客户端超时)相匹配,如果匹配则返回 true,否则返回 false。这样就能防止我们的消费者被不可恢复错误一直阻塞下去。
诚然,要在可恢复错误和不可恢复错误之间消除歧义可能很困难。例如,一个 SQLException 可能指的是一次数据库故障(可恢复)或一次约束违反状况(不可恢复)。如有疑问,我们可能应该假设错误是不可恢复的——为此要冒的风险是将其他好的消息发送给隐藏主题,从而延迟它们的处理……但这也能避免我们无意间陷入泥潭,无休止地尝试处理不可恢复错误。
在消费者内重试可恢复错误
正如我们所讨论的那样,存在可恢复错误时,将消息发布到重试主题毫无意义。我们只会为下一条消息的失败扫清道路。相反,消费者可以简单地重试,直到条件恢复。
当然,出现可恢复错误意味着外部资源存在问题。我们不断对这块资源发送请求是无济于事的。因此,我们希望对重试应用一个退避策略。我们的伪 Java 代码现在可能看起来像这样:
void processMessage(KafkaMessage km) {
try {
Message m = km.getMessage();
transformAndSave(m);
} catch (Throwable t) {
if (isRecoverable(t)) {
doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
} else {
// ...
}
}
}
(注意:我们使用的任何退避机制都应配置为在达到某个阈值时向我们发出警报,并通知我们潜在的严重错误)
遇到不可恢复错误时,将消息直接发送到最后一个主题
另一方面,当我们的消费者遇到不可恢复错误时,我们可能希望立即隐藏(stash)该消息,以释放后续消息。但在这里使用多个重试主题会有用吗?答案是否定的。在转到 DLQ 之前,我们的消息只会经历 n 次消费失败而已。那么,为什么不从一开始就将消息粘贴在那里呢?
与重试主题一样,这个主题(在这里,我们将其称为隐藏主题)将拥有自己的消费者,其与主消费者保持一致。但就像 DLQ 一样,这个消费者并不总是在消费消息;它只有在我们明确需要时才会这么做。
考虑排序
来看看排序的情况。我们在这里重用之前的“用户/登录”示例。尝试处理 Zoë名称中的ë字符时,Login 消费者可能会遇到错误。消费者将其识别为一个不可恢复错误,将消息放在一边,然后继续处理后续消息。不久之后,消费者将获得 Zoiee 消息并成功处理它。
Zoë消息已隐藏,并且 Zoiee 消息现在已成功处理完毕。目前,两个有界上下文之间的数据是一致的。
晚些时候,我们的团队会修复消费者,以便其可以正确处理特殊字符并重新部署它。然后,我们将 Zoë消息重新发布给消费者,消费者现在可以正确处理该消息了。
注意!Apache下这些与Hadoop相关的开源项目要退休了!
当更新的消费者随后处理隐藏的 Zoë消息后,两个有界上下文之间的数据将变得不一致。因此,当 User 有界上下文将用户视为 Zoiee 时,Login 有界上下文会将她称为 Zoë。
显然,我们没有保持排序;Zoë是在 Zoiee 之前由 Login 消费者处理的,但正确的顺序是倒过来的。隐藏一条消息后,我们可以开始隐藏所有消息,但在那种情况下我们实际上会陷入困境。幸运的是,我们不需要保持所有消息的顺序,只需考虑与单个聚合相关联的消息即可。因此,如果我们的消费者可以跟踪已隐藏的特定聚合,它就可以确保属于同一聚合的后续消息也被隐藏。
收到隐藏主题中消息的警报后,我们可以取消部署消费者并修复其代码(请注意:切勿修改消息本身;消息代表不可变的事件!)在修复并测试了我们的消费者之后,我们可以重新部署它。当然,在继续使用主要主题之前,我们将需要特别注意先处理隐藏主题中的所有记录。这样,我们将继续保持正确的排序状态。出于这个原因,我们将首先部署隐藏消费者,并且只有在其完成时(这意味着消费者组中的所有实例都完成,如果我们使用了多个消费者),我们才会取消部署它并部署主消费者。
我们还应该考虑以下事实:固定的消费者处理了隐藏消息后,它仍可能会遇到其他错误。在这种情况下,其错误处理行为应像我们之前描述的那样:
如果错误是可恢复的,则使用退避策略重试; 如果错误是不可恢复的,它将隐藏消息并继续下一条消息。
为此,我们可以考虑使用第二个隐藏主题。
可以接受一些数据不一致?
这样的系统构建起来可能会变得相当复杂。它们可能很难构建、测试和维护。因此,某些组织可能会想要确定出数据不一致的可能性,并判断他们是否可以承受这种风险。
在许多情况下,这些组织可能会采用数据协调机制,以使他们的数据最终(是相对较长的“最终”)变得一致。为此也存在许多策略(超出了本文的范围)。
总结
处理重试似乎很复杂,那是因为它就是这么麻烦——和一切正常时 Kafka 相对优雅的风格相比之下尤其明显。我们构建的任何合适的解决方案(无论是重试主题、隐藏主题还是其他解决方案)都将比我们想要的更复杂。
不幸的是,如果我们希望在微服务之间建立弹性的异步通信流,那么我们就不能忽略它。
本文介绍了一种流行的解决方案、它的缺点以及在设计替代解决方案时应考虑的一些事项。到最后,想要构建正确的解决方案,我们就应该牢记一些事情,例如:
了解 Kafka 通过主题、分区和分区键提供的功能。 考虑到可恢复错误与不可恢复错误之间的差异。 设计模式的用法,例如有界上下文和聚合。 无论现在还是将来,都要搞清楚我们组织的用例特性。我们只是在移动独立的记录吗?……在这种情况下,我们可能不关心排序;还是说我们正在传播表示数据更改的事件?……在这种情况下,排序至关重要。 仔细考虑我们是否愿意承受任何水平的数据不一致。
参考资料
https://dzone.com/articles/creating-apache-kafka-topics-dynamically-as-part-o https://quarkus.io/blog/kafka-failure-strategy/ https://eng.uber.com/reliable-reprocessing/ https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-topics.html https://www.red-gate.com/simple-talk/sql/bi/reconciling-data-across-systems-using-reconciliation-hub/
往期推荐
Spring For All社区3.0开始测试啦!
学习的路上不孤单,快来注册分享与交流吧!
点击阅读原文直达新版社区