kafka重复生产,如何确保kafka生产者,生产数据不丢失

  kafka重复生产,如何确保kafka生产者,生产数据不丢失

  1.背景最近Kafka日志集群被折腾的够呛。由于公司部署的应用越来越多,日志采集程序将采集到的日志发送到Kafka集群时会有很大的延迟,总TPS一直上不去。为了不影响业务团队的日志故障排除,Kafka集群通过先解决问题再排除故障的方式进行了扩展。但扩容后尴尬的是,新增的5台机器中,有两台机器的消费发送响应时间明显高于其他机器。为了确保消息服务的稳定性,群集被暂时收缩,这台机器被从群集中删除。具体操作是简单粗暴的使用kill pid命令,但是意外发生了。

  发现Java客户端报告了以下错误:

  Go客户端报告的错误如下:

  基本上可以认为是部分分区没有上线领导,无法成功发送消息。

  2.问题分析。那为什么会出现这个问题呢?当Kafka中的一个节点离线时,它会自动触发故障转移。分区领导不会改选吗?请从这个问题开始我们今天的探究之旅。

  首先,我们可以看看当前有问题的分区的路由信息。从第一张图可以看出,主题dw_test_kafka_0816000的101分区消息发送失败。我们来看看它在Zookeeper中的地位。具体命令如下:/zkCli.sh -server 127.0.0.1:2181

  get-s/Kafka _ cluster _ 01/brokers/topics/dw _ test _ Kafka _ 0816000/partitions/101/state这个命令可以看到对应分区的信念信息,如下图所示:

  这里显示首领的状态是-1,而isr列表中只有一个副本,在broker-1上,但是broker id为1的机器此时处于离线状态,那么为什么不触发分区首领的重选呢?

  其实我相信你稍微想一想就能找到线索。isr字段的值为1,表示该分区的副本数为1,表示该分区只在一个代理上存储数据。一旦代理离线,集群中就没有该分区的数据了。这时候是不能进行故障转移的,因为一旦故障转移,分区的数据就会丢失,影响非常严重。

  那为什么这个主题的副本数设置为1?那是因为当时集群的压力太大,节点间复制的数据量巨大,网卡基本都是满负荷运行,而且是日志集群,更大程度上接受了数据的丢失。所以当时为了避免簇间数据的大量拷贝,这个主题的拷贝数设置为1。

  但是集群节点的停机和维护是必不可少的,不可能每次停机和维护都会造成一段时间的数据写入失败。要解决这个问题,我们需要在关机前移动主题的分区,将主题的分区从需要关机的集群中移除。

  主题迁移的具体做法,请参考我上一篇文章《卡夫卡主题迁移实践》第三部分。

  3.kafka节点离线分区的故障转移机制。Kafka单副本主题在集群中某个节点离线后,将无法完成分区的故障切换机制。为了更深入的了解底层的一些实现细节,我想进一步探讨一下Kafka node的离线的一些故障转移机制。

  温馨提示:接下来主要从源代码角度探讨实现原理,加深对这个过程的理解。如果不感兴趣,可以直接进入本文第四部分:总结。

  Kafka所依赖的Zookeeper服务器存储了当前集群内存的broker信息。具体路径是/{ namespace }/brokers/brokers/ids。具体图示如下:

  而且ids下的每个节点都记录了Broker的一些信息,比如对外提供服务的协议和端口。值得注意的是,这些节点是临时节点,如下图所示:

  这样,一旦对应的代理离线,对应的节点就会被删除。Kafka集群中的控制器角色启动时,会监听该节点下节点的变化并做出响应。最后,它会调用KafkaController的onBrokerFailure方法。具体代码如下:

  这种方法的实现比较复杂,这里不做过多的分散,把重点放在寻找分区的故障转移机制上,也就是接下来我们会具体分析KafkaController的onReplicasBecomeOffline方法,主要探讨分区的故障转移机制。

  3.1 onReplicasBecomeOffline故障转移由于这种方法实现起来比较复杂,所以接下来将对其进行分发和详细解释。

  第一步:对需要设置为离线状态的分区进行分组,分组的依据是是否删除。未触发删除的集合由NewOfflineReplicasNotDeletion表示,需要删除的集合由newofflineReplicasForDeletion表示。

  步骤2:选择没有引线的分区,并使用没有引线的分区。代码如下图所示:

  分区中没有领导者的标准是分区中领导者的副本所在的代理没有离线并且没有被删除。第三步:将没有Leader的分区状态改为OfflinePartition(脱机状态),其中状态更新放在kafka控制器的内存中。具体的内存结构是map[主题分区,分区状态]。

  步骤4: Kafka分区状态机驱动(触发)分区状态从OfflinePartition和NewPartition转换到OnlinePartition。状态转换主要包括两个重要步骤:

  调用PartitionStateMachine的doHandleStateChanges方法驱动分区状态机的转换。然后调用ControllerBrokerRequestBatch的sendRequestsToBrokers方法实现元信息在其他代理上的同步。由于篇幅问题,本文不系统介绍Kafka分区状态机的实现细节。首先,我们将关注从OfflinePartition的离线状态到OnlinePartition的转换过程。

  首先我们来解释一下OfflinePartition转换成OnlinePartition过程中各个参数的含义:

  Seq[TopicPartition]分区当前处于OfflinePartition和NewPartition状态,没有已删除的分区。PartitionState targetState状态驱动的目标状态:OnlinePartition。PartitionLeader选举策略Partitionleader选举策略,这里说的是离线Partition Leader选举策略,是分区离线状态下的领袖选举策略。这里判断划分是否有效的依据主要是基于状态机设置的驱动条件。例如,只有三种分区状态OnlinePartition、NewPartition和OfflinePartition可以转换为OnlinePartition。

  接下来重点讲一下改成OnlinePartition的具体实现逻辑。具体代码如下:

  实施分为三个步骤:

  首先,分别选择当前状态为NewPartition的set和(OfflinePartition或OnlinePartition)分区。对于处于NewPartition状态的分区,进行分区的初始化,这通常是对于处于OfflinePartition或OnlinePartition状态的分区扩展或新创建的执行分区的重选,因为这些集合中的分区当前是没有领导者的分区,这些分区暂时不能接受读写请求。接下来,我们重点来看一下offline状态改为OnlinePartition时分区首领选举的实现。具体方法是:PartitionStateMachine的electLeaderForPartitions方法,代码如下:

  这个方法的实现结构比较简单,返回值是两组,一组选举成功,一组选举不成功。同时,如果选举过程中出现可恢复的异常,它将再次尝试。

  的具体重试逻辑是由doElectLeaderForPartitions方法实现的,非常复杂。

  3.2分区选举机制分区选举是通过PartitionStateMachine的doElectLeaderForPartitions方法实现的,接下来会一步步讲解。

  第一步:首先,从Zookeeper中获取要选择的分区的元信息。代码如下:

  Kafka中主题的路由信息存储在Zookeeper中,具体路径为:/{ namespace }/brokers/topics/{ topic name } }/partitions/{ partition }/state。具体存储内容如下:

  步骤2:将查询到的主题分区元信息组装到Map Topic Partition的Map结构中,LeadersRandControllerepoch。代码如下:

  第三步:将分区中控制器的epoch与当前Kafka控制器的Epoch进行比较,选择无效集和有效集。具体代码如下:

  如果当前控制器的ControllerEpoch小于分区状态下的controllerEpoch,则说明有新的Broker取代了当前控制器成为集群的新控制器,所以这次无法选择Leader并打印日志。

  第四步:根据领袖选举策略进行领袖选举。代码如下:

  由于我们这次是从OfflinePartition状态更改为OnlinePartition状态,所以传入分支是LeaderForOffline。这个方法我们后面会详细介绍。选举后返回的值是两个集合,其中partitionsWithoutLeaders表示没有成功选举领导者的分区,partitionsWithLeaders表示成功选举领导者的分区。

  第五步:打印未成功选举首领的分区的相应日志,并将其添加到失败队列集合中,如下图所示:

  第5步:将选举结果更新到zookeeper,如下图所示:

  步骤6:将最新的分区选举结果同步到其他代理节点。

  更新分区状态Leader_AND_ISR的请求被其他代理接受后,将根据分区的LEADER和replica的信息成为分区的LEADER节点或slave节点。这个块的实现细节将在本专栏的后续文章中特别提到。

  线下分区领导策略到底是如何进行选举的?接下来,我们探讨实现细节。

  3.3离线分区领导策略选举策略实现离线分区领导策略的选举策略的代码可以在分区状态机的leader中找到。我们仍然采取循序渐进的方法。

  第一步:主要用下面的代码初始化几个集合

  简单介绍一下以上变量:

  Partitionswithliveinsyncreplicas分区的副本所在的代理根本不活动。Partitionswithliveinsyncreplicas分区的副本集所在的代理部分或完全处于活动状态。无论问题副本是否打开,如果您不在isr集合中,您都可以参与领导者活动。可以在主题级别设置unclear . leader . election . enable,默认为false。第二步:执行地区领导人选举。具体实现代码如下:

  首先,解释以下变量的含义:

  分配分区(其中brokerId为broker id)设置的副本集liveReplicas当前联机副本集的具体选举算法如下:

  离线选举算法很简单:如果不清楚。Leader.election.enable=false,从幸存的ISR集中选择分区中的第一个领导者;如果没有幸存的ISR复制和不清楚。leader.election.enable=true,选择联机副本;否则,返回NONE,表示没有成功选择合适的领导者。

  然后返回本次选举结果,完成本次选举。

  4.摘要本文从一个实际的生产失败开始。分析后得出结论,当集群中单个副本主题离线时,部分队列无法写入。解决方法是先移动主题分区,即把需要停止的代理的分区移动到其他代理。此过程不会影响消息发送和消息消费。

  版权归作者所有:原创作品来自博主小二上九8,转载请联系作者取得转载授权,否则将追究法律责任。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: