kafka reset offset,kafka查看offset命令

  kafka reset offset,kafka查看offset命令

  1\.值选择和定义# auto.offset.reset有以下三个可选值:

  最新(默认)最早的一个有一个共同的定义:

  对于同一个消费群,如果已经有提交的冲抵,则从提交的冲抵中消费。

  也就是说,只要这个消费群体消费过,无论auto.offset.reset指定什么值,效果都是一样的。每次启动时,最新的现有偏移开始,然后稍后消耗。

  不同之处在于:

  最新(默认):对于同一个消费者组,如果还没有提交抵销,那么只有消费者连接到主题后才会被消费。新生成的数据是指,如果这个话题有历史消息,现在启动了一个新的消费者组,且auto.offset.reset=latest,此时不能消费已有的历史消息,所以保持消费者组运行。如果此时有新消息进来,新消息将被消耗。一旦有消费,就会提交抵销。

  这时候如果消费群意外下线,话题还是有消息进来,然后消费群又重新上线,还是能从下线时的偏移量开始消费,这就是通常的定义。

  最早:对于同一个消费群,如果还没有提交冲抵,就从开始消费。也就是说,如果题目有历史信息,现在又开始了新的消费群体,而auto。offset.reset=early,它会从一开始就被消耗掉,和latest不一样。

  一旦消费群体消费了话题,此时就会有消费群体的抵消。在这种情况下,即使自动。offset.reset=early被指定,重新启动消费者组将具有与最新的相同的效果,即此时将采用一个通用定义。

  无:对于同一个使用者组,如果尚未提交抵销,将引发异常。通常,在生产环境中不使用此参数。

  2\.#创建新主题#。/Kafka-topics . sh-bootstrap-server 127 . 0 . 0 . 1:9092-topic testoffsetreset topic-partitions 1-replication-factor 1-create 3 \。向新主题#发送消息以便于测试,以及

  公共类TestProducer {

  公共静态void main(String[] args)引发InterruptedException {

  Properties属性=新属性();

  properties.put(ProducerConfig。BOOTSTRAP_SERVERS_CONFIG, 192 . 168 . 123 . 124:9092 );

  properties.put(ProducerConfig。KEY_SERIALIZER_CLASS_CONFIG,string SERIALIZER . CLASS . getname());

  properties.put(ProducerConfig。VALUE_SERIALIZER_CLASS_CONFIG,string SERIALIZER . CLASS . getname());

  KafkaProducer String,String producer=new KafkaProducer(属性);

  string topic= TestOffsetResetTopic ;

  for(int I=0;i i ) {

  string value= message _ I _ local datetime . now();

  System.out.println(发送值: 值);

  producer . send(new producer record(主题,值),(元数据,异常)- {

  if (exception==null) {

  String str=MessageFormat.format(发送成功!主题:{0},分区:{1},偏移量:{2} ,metadata.topic(),metadata.partition(),metadata.offset())。

  system . out . println(str);

  }

  });

  thread . sleep(500);

  }

  producer . close();

  }

  }消息发送成功:

  发送值:message _ 0 _ 2022-09-16t 18:26:15.943749600

  发送成功!主题:TestOffsetResetTopic,分区:0,偏移量:0

  发送值:message _ 1 _ 2022-09-16t 18:26:17.066608900

  发送成功!主题:TestOffsetResetTopic,分区:0,偏移量:1

  发送值:message _ 2 _ 2022-09-16t 18:26:17.568667200

  发送成功!主题:TestOffsetResetTopic,分区:0,偏移量:2

  发送值:message _ 3 _ 2022-09-16t 18:26:18.069093600

  发送成功!主题:TestOffsetResetTopic,分区:0,偏移量:3

  发送值:message _ 4 _ 2022-09-16t 18:26:18.583288100

  发送成功!主题:TestOffsetResettotopic,分区:0,偏移量:4。现在,TestOffsetResettotopic有五条消息,没有一个消费群体消费过,也就是没有抵消。

  4\.测试最新#已知主题已经存在5条历史消息,此时启动一个消费者

  公共类TestConsumerLatest {

  公共静态void main(String[] args) {

  性能属性=新属性();

  properties.put(消费者配置BOOTSTRAP_SERVERS_CONFIG, 192。168 .123 .124:9092 );

  properties.put(消费者配置KEY_DESERIALIZER_CLASS_CONFIG,字符串反序列化器。班级。getname());

  properties.put(消费者配置VALUE _ DESERIALIZER _ CLASS _ CONFIG,string DESERIALIZER。班级。getname());

  //指定消费者组

  properties.put(消费者配置. GROUP_ID_CONFIG, GROUP 1 );

  //设置自动偏移复位

  properties.put(消费者配置. AUTO_OFFSET_RESET_CONFIG,最新);

  string topic= TestOffsetResetTopic ;

  卡夫卡消费者字符串,字符串消费者=新KafkaConsumer(属性);

  消费者。订阅(收藏。singleton list(topic));

  //消费数据

  while (true) {

  消费者记录字符串,字符串消费者记录=消费者。投票(持续时间。秒(1));

  for (ConsumerRecord字符串,字符串消费者记录:消费者记录){

  系统。出去。println(消费者记录);

  }

  }

  }

  }发现如上面所述,历史已存在的5条消息不会消费到,消费者没有任何动静,现在保持消费者在线

  启动测试制作人再发5条消息,会发现这后面新发的,偏移从5开始的消息就被消费了

  消费者记录(topic=TestOffsetResetTopic,partition=0,leaderEpoch=0,offset=5,CreateTime=1663329725731,序列化键大小=-1,序列化值大小=39,headers=记录头(headers=[],isReadOnly=false),key=null,value=message _ 0 _ 2022-09-16t 20:02:05.523581500

  消费者记录(topic=TestOffsetResetTopic,partition=0,leaderEpoch=0,offset=6,CreateTime=1663329726251,序列化键大小=-1,序列化值大小=39,headers=记录头(headers=[],isReadOnly=false),key=null,value=message _ 1 _ 2022-09-16t 20:02:06.251399400

  消费者记录(topic=TestOffsetResetTopic,partition=0,leaderEpoch=0,offset=7,CreateTime=1663329726764,序列化键大小=-1,序列化值大小=39,headers=记录头(headers=[],isReadOnly=false),key=null,value=message _ 2 _ 2022-09-16t 20:02:06.764186200

  消费者记录(topic=TestOffsetResetTopic,partition=0,leaderEpoch=0,offset=8,CreateTime=1663329727264,序列化键大小=-1,序列化值大小=39,headers=记录头(headers=[],isReadOnly=false),key=null,value=message _ 3 _ 2022-09-16t 20:02:07.264268500

  消费者记录(topic=TestOffsetResetTopic,partition=0,leaderEpoch=0,offset=9,CreateTime=1663329727778,序列化键大小=-1,序列化值大小=39,headers=记录头(headers=[],isReadOnly=false),key=null,value=message _ 4 _ 2022-09-16t 20:02:07.778469700此时该消费者组对于这个主题的偏移已经为9了,现在停掉这个消费者(下线),再启动测试制作人发5条消息,接着再启动TestConsumerLatest,会发现紧接上一次的抵消之后开始,即从10继续消费

  如果测试发现没动静,请多等一会,估计机器性能太差.

  5\.测试最早#新建一个测试消费者,设置自动偏移复位为最早,注意组名为新的第二组,表示对于主题来说是全新的消费者组

  公共类TestConsumerEarliest {

  公共静态void main(String[] args) {

  性能属性=新属性();

  properties.put(消费者配置BOOTSTRAP_SERVERS_CONFIG, 192。168 .123 .124:9092 );

  properties.put(消费者配置KEY_DESERIALIZER_CLASS_CONFIG,字符串反序列化器。班级。getname());

  properties.put(消费者配置VALUE _ DESERIALIZER _ CLASS _ CONFIG,string DESERIALIZER。班级。getname());

  //指定消费者组

  properties.put(消费者配置. GROUP_ID_CONFIG, GROUP 2 );

  //设置自动偏移复位

  properties.put(消费者配置 AUTO_OFFSET_RESET_CONFIG 最早);

  string topic= TestOffsetResetTopic ;

  卡夫卡消费者字符串,字符串消费者=新KafkaConsumer(属性);

  消费者。订阅(收藏。singleton list(topic));

  //消费数据

  while (true) {

  消费者记录字符串,字符串消费者记录=消费者。投票(持续时间。秒(1));

  for (ConsumerRecord字符串,字符串消费者记录:消费者记录){

  系统。出去。println(消费者记录);

  }

  }

  }

  }一运行发现已有的10条消息(最开始5条加上面一次测试又发了5条,一共10条)是可以被消费到的,且消费完后,对于这个主题就已经有了第二组这个组的偏移了,无论之后启停,只要组名不变,都会从最新的偏移往后开始消费

  6\.测试无#新建一个测试消费者,设置自动偏移复位为没有,注意组名为新的第三组,表示对于主题来说是全新的消费者组

  公共类TestConsumerNone {

  公共静态void main(String[] args) {

  性能属性=新属性();

  properties.put(消费者配置BOOTSTRAP_SERVERS_CONFIG, 192。168 .123 .124:9092 );

  properties.put(消费者配置KEY_DESERIALIZER_CLASS_CONFIG,字符串反序列化器。班级。getname());

  properties.put(消费者配置VALUE _ DESERIALIZER _ CLASS _ CONFIG,string DESERIALIZER。班级。getname());

  //指定消费者组

  properties.put(消费者配置. GROUP_ID_CONFIG, GROUP 3 );

  //设置自动偏移复位

  properties.put(消费者配置. AUTO_OFFSET_RESET_CONFIG, none );

  string topic= TestOffsetResetTopic ;

  卡夫卡消费者字符串,字符串消费者=新KafkaConsumer(属性);

  消费者。订阅(收藏。singleton list(topic));

  //消费数据

  while (true) {

  消费者记录字符串,字符串消费者记录=消费者。投票(持续时间。秒(1));

  for (ConsumerRecord字符串,字符串消费者记录:消费者记录){

  系统。出去。println(消费者记录);

  }

  }

  }

  }一运行,程序报错,因为对于主题来说是全新的消费者组,且又指定了自动偏移复位为没有,直接抛异常,程序退出

  线程“主要”组织。阿帕奇。卡夫卡。客户。消费者。nooffsetforpartitionexception中出现异常:分区没有重置策略的未定义偏移量:[TestOffsetResetTopic-0]

  位于org。阿帕奇。卡夫卡。客户。消费者。内部。订阅状态。resetinitializingpositions(订阅状态。Java:706)

  位于org。阿帕奇。卡夫卡。客户。消费者。kafkaconsumer。updatefetchpositions(kafkaconsumer。Java:2434)

  位于org。阿帕奇。卡夫卡。客户。消费者。kafkaconsumer。updateassignmentmetadataifneed(kafkaconsumer。Java:1266)

  位于org。阿帕奇。卡夫卡。客户。消费者。kafkaconsumer。民调(kafkaconsumer。Java:1231)

  位于org。阿帕奇。卡夫卡。客户。消费者。kafkaconsumer。民调(kafkaconsumer。Java:1211)

  在卡克法testconsumernone。main(testconsumernone。Java:31)7 \总结#如果话题已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定自动偏移复位为最早,才可以消费到历史数据,之后就有提交偏移。有了偏移,无论是最早还是最新消息,效果都是一样的了。如果话题没有历史消息,或者不需要处理历史消息,那按照默认最新即可。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: