Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1106337
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 服务器与存储

2021-02-26 10:46:43

  最近有个项目需求,有个数据转发服务,需要从kafka消息队列消费数据,然后转发到rabbitmq。要求转发数据不重复,且不能丢失数据。这里主要考虑转发的目的地址网络异常时,数据不能送达情况,当网络恢复时,要接着异常前发送的记录继续发送。
  要来实现此功能,需要先清楚kafka消费原理:
  kakfa用offset来记录某消费者消费到的位置,由于kafka是个分布式结构,数据被存放在多个partition上,那么要为每个partition单独记录一个offset,该offset保存在一个叫__consumer_offsets的Topic里,与此同时,kafka规定在同一消费者组里,同一时刻一个partition只能有一个消费者,这样的规定优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单,另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据是有序被消费。
  kafka消费者,在消费当前消息后,如果进行不提交,在当前线程中下次poll是并不是当前消息,而是下条消息。例如当前消费的消息offset是28,如果不进行commit,下次获得的消息offset并不是28,而是29,即当前offset+1。只有消费再平衡时才能获得offset为28的消息
   根据kafka消费原理,要实现对于消费者处理消息失败,能正确处理重复消息,需要以下步骤:
 A) 在处理失败时,记录当前记录的offset 和parition
 B) 用seek函数定位到记录的offset和partion
 C)用poll函数,即可返回上次的offset消息
  1. public void run(){

  2.     Map<Long,Long> offsetMap=new HashMap<>(); //用来保存每个offset,首次处理失败时间
  3.     final long retainTimeMill=retainTime*3600*1000; //转发失败时消息保留时间
  4.     int lastPartition=0; //记录最后一次成功处理消息的分区
  5.     long lastOffset=0;//记录最后一次成功处理消息的offset
  6.     boolean seek=false;
  7.     while (true){
  8.      try {
  9.          if(seek){ //判读是否需要定位到,最后一次成功处理消息的offset
  10.              consumer.seek(new TopicPartition(topic,lastPartition),lastOffset);
  11.          }
  12.          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  13.          for (ConsumerRecord<String,String> record:records){
  14.              String key=record.key();
  15.              String value = record.value();
  16.              lastOffset=record.offset();
  17.              lastPartition=record.partition();
  18.              iForwardService.forward(ConstUtils.DATA_REPORT_API,value, taskName); //转发失败,会抛出异常,走到catch中
  19.              seek=false; //转发成功
  20.              logger.info(" DeviceDataListenTask forward success taskName:{} offset:{} ", taskName, record.offset());
  21.          }//for
  22.          if(!records.isEmpty()){
  23.              consumer.commitSync(); //提交offset
  24.          }

  25.      }catch (Exception ex){
  26.          logger.error("DeviceDataListenTask run lastOffset:{} taskName:{} while exception,{}", lastOffset ,taskName,ex.getMessage());
  27.          if(lastOffset>0){//记录处理offset
  28.              seek=true; //转发失败
  29.              Long exsitRetainTime=offsetMap.get(lastOffset);
  30.              if(exsitRetainTime==null){
  31.                  offsetMap.put(lastOffset,System.currentTimeMillis()); //记录每个offset,首次处理失败时间
  32.              }else {
  33.                  //如果异常时间超过配置时间提交数据
  34.                  if((System.currentTimeMillis()-exsitRetainTime)>=retainTimeMill){
  35.                      consumer.commitSync();
  36.                      offsetMap.remove(lastOffset);
  37.                      seek=false;
  38.                  }
  39.              }
  40.          }

  41.      }
  42.    }
  43.    
  44. }

阅读(25501) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~