最近有个项目需求,有个数据转发服务,需要从kafka消息队列消费数据,然后转发到rabbitmq。要求转发数据不重复,且不能丢失数据。这里主要考虑转发的目的地址网络异常时,数据不能送达情况,当网络恢复时,要接着异常前发送的记录继续发送。
要来实现此功能,需要先清楚kafka消费原理:
kakfa用offset来记录某消费者消费到的位置,由于kafka是个分布式结构,数据被存放在多个partition上,那么要为每个partition单独记录一个offset,该offset保存在一个叫__consumer_offsets的Topic里,与此同时,kafka规定在同一消费者组里,同一时刻一个partition只能有一个消费者,这样的规定优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单,另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据是有序被消费。
kafka消费者,在消费当前消息后,如果进行不提交,在当前线程中下次poll是并不是当前消息,而是下条消息。例如当前消费的消息offset是28,如果不进行commit,下次获得的消息offset并不是28,而是29,即当前offset+1。只有消费再平衡时才能获得offset为28的消息
根据kafka消费原理,要实现对于消费者处理消息失败,能正确处理重复消息,需要以下步骤:
A) 在处理失败时,记录当前记录的offset 和parition
B) 用seek函数定位到记录的offset和partion
C)用poll函数,即可返回上次的offset消息
-
public void run(){
-
-
Map<Long,Long> offsetMap=new HashMap<>(); //用来保存每个offset,首次处理失败时间
-
final long retainTimeMill=retainTime*3600*1000; //转发失败时消息保留时间
-
int lastPartition=0; //记录最后一次成功处理消息的分区
-
long lastOffset=0;//记录最后一次成功处理消息的offset
-
boolean seek=false;
-
while (true){
-
try {
-
if(seek){ //判读是否需要定位到,最后一次成功处理消息的offset
-
consumer.seek(new TopicPartition(topic,lastPartition),lastOffset);
-
}
-
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
-
for (ConsumerRecord<String,String> record:records){
-
String key=record.key();
-
String value = record.value();
-
lastOffset=record.offset();
-
lastPartition=record.partition();
-
iForwardService.forward(ConstUtils.DATA_REPORT_API,value, taskName); //转发失败,会抛出异常,走到catch中
-
seek=false; //转发成功
-
logger.info(" DeviceDataListenTask forward success taskName:{} offset:{} ", taskName, record.offset());
-
}//for
-
if(!records.isEmpty()){
-
consumer.commitSync(); //提交offset
-
}
-
-
}catch (Exception ex){
-
logger.error("DeviceDataListenTask run lastOffset:{} taskName:{} while exception,{}", lastOffset ,taskName,ex.getMessage());
-
if(lastOffset>0){//记录处理offset
-
seek=true; //转发失败
-
Long exsitRetainTime=offsetMap.get(lastOffset);
-
if(exsitRetainTime==null){
-
offsetMap.put(lastOffset,System.currentTimeMillis()); //记录每个offset,首次处理失败时间
-
}else {
-
//如果异常时间超过配置时间提交数据
-
if((System.currentTimeMillis()-exsitRetainTime)>=retainTimeMill){
-
consumer.commitSync();
-
offsetMap.remove(lastOffset);
-
seek=false;
-
}
-
}
-
}
-
-
}
-
}
-
-
}
阅读(25540) | 评论(0) | 转发(0) |