Chinaunix首页 | 论坛 | 博客
  • 博客访问: 6047
  • 博文数量: 1
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 20
  • 用 户 组: 普通用户
  • 注册时间: 2017-10-16 16:06
个人简介

人闲话不多

文章分类

全部博文(1)

文章存档

2017年(1)

我的朋友
最近访客

分类: Java

2017-10-16 16:19:14

package com.study.biz.kafka.service;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import javax.annotation.PostConstruct;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;


import com.study.biz.constant.ProcessStatusEnum;
import com.study.biz.constant.TopicEnum;
import com.study.integration.redis.RedisService;
import com.study.integration.tools.CustomPartitioner;
import com.study.integration.tools.KafkaProducerMessage;
import com.study.integration.tools.KafkaProducerSingleton;
import com.study.integration.tools.LoggerTools;
import com.study.integration.tools.StringTools;


@Service("KafkaService")
public class KafkaServiceImpl implements KafkaService {


// 接收消息配置
private Properties consumeProps = null;


// 发送消息配置
private Properties produceProps = null;


@Value("${bootstrap.servers.config}")
private String BOOTSTRAP_SERVERS_CONFIG;


@Value("${group.id}")
private String GROUP_ID;


@Value("${zookeeper.session.timeout.ms}")
private String SESSION_TIMEOUT_MS_CONFIG;


@Value("${auto.commit.enable}")
private String ENABLE_AUTO_COMMIT;


@Value("${max.poll.records}")
private String MAX_POLL_RECORDS;


@Value("${reconnect.backoff.ms}")
private String RECONNECT_BACKOFF_MS;


@Value("${retry.backoff.ms}")
private String RETRY_BACKOFF_MS;


@Autowired
private MessageHandleService messageHandleService;


@Autowired
private RedisService redisService;


private final List consumers = new ArrayList<>();


private ExecutorService consumerExecutorPool;


private ExecutorService produceExecutorPool;


private final static long DEFAULT_MAX_RETRIES = 4;


private final static Long[] RETRY_INTERVAL_ARR = { 5L, 20L, 40L, 60L };


@PostConstruct
public void init() {
consumeProps = new Properties();
consumeProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumeProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumeProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG);
consumeProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumeProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumeProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
consumeProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
consumeProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS);
consumeProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
consumerExecutorPool = Executors.newFixedThreadPool(100);


produceProps = new Properties();
produceProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);// broker
produceProps.put(ProducerConfig.ACKS_CONFIG, "1");
produceProps.put(ProducerConfig.LINGER_MS_CONFIG, 100);
produceProps.put(ProducerConfig.RETRIES_CONFIG, 3);
produceProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// key
produceProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value
produceProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getCanonicalName());// 自定义分区函数
KafkaProducerSingleton.setProperties(produceProps);


produceExecutorPool = Executors.newFixedThreadPool(15);


}


/*
* 接收kafka消息
*/
@Override
public void consumeMessage() {
  TopicEnum[] topicEnums = TopicEnum.values();
  for (TopicEnum topicEnum : topicEnums) {
   String topic = topicEnum.getValue().trim();
   ConsumerProcesser consumerProcesser = new ConsumerProcesser(topic);
   consumers.add(consumerProcesser);
   consumerExecutorPool.submit(consumerProcesser);
  }



Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LoggerTools.trace(this.getClass(), "Runtime.getRuntime", "addShutdownHook,开始执行了....");
try {
for (ConsumerProcesser consumer : consumers) {
consumer.shutdown();
}
consumerExecutorPool.shutdown();
consumerExecutorPool.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LoggerTools.error(this.getClass(), "Runtime.getRuntime", "addShutdownHook,抛出异常", e);
}
}
});
}


public class ConsumerProcesser implements Runnable {


private String topic;


private KafkaConsumer consumer;


public ConsumerProcesser(String topic) {
this.topic = topic;
consumer = new KafkaConsumer<>(consumeProps);
}


@Override
public void run() {
LoggerTools.trace(this.getClass(), "订阅topic:" + topic + "成功");
reciveMessage(topic);
}


private void reciveMessage(String consumerTopic) {
consumer.subscribe(Arrays.asList(consumerTopic), new SaveOffsetsOnRebalance(consumer));
consumer.poll(0);


boolean exists = redisService.exists(consumerTopic);
if (exists) {
for (TopicPartition partition : consumer.assignment()) {
Long offsetFromRedis = getOffsetFromRedis(partition.topic(), partition.partition());
consumer.seek(partition, offsetFromRedis);
}
} else {
consumer.seekToEnd(consumer.assignment());
}
do {
try {
recordLoop(consumerTopic);
} catch (Exception e) {
LoggerTools.error(this.getClass(), "reciveMessage", "处理异常", e);
}
} while (1 != 2);
}


private void recordLoop(String consumerTopic) throws Exception {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {


String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
Long offsetFromRedis = getOffsetFromRedis(topic, record.partition());
if (String.valueOf(offset).equals(String.valueOf(offsetFromRedis)) && offset != 0L) {
continue;
}


ProcessStatusEnum processStatusEnum = messageHandleService.handleMessage(record);


// 消息处理成功
if (ProcessStatusEnum.FAILED_ERROR != processStatusEnum) {
storeOffsetInRedis(topic, partition, offset);
continue;
}


handleMessage(topic, record, partition, offset, consumerTopic);


// TopicPartition topicPartition = new TopicPartition(topic,
// partition);
//
// //临时方案,否则数据失败后连续重试
// consumer.seek(topicPartition, offset);


}
}


public void shutdown() {
if (consumer != null) {
consumer.wakeup();
}
}


}


// 抽出方法降低层数复杂度
private void handleMessage(String topic, ConsumerRecord record, int partition, long offset, String consumerTopic) throws Exception {


ProcessStatusEnum processStatusEnum = null;
long retries = DEFAULT_MAX_RETRIES;


for (int i = 0; i < retries; i++) {
if (retries != -1) {
LoggerTools.recordError(this.getClass(), "recordLoop",
String.format("[开始重试] %s times, for consuming topic:%s, sleeping %s seconds", (i + 1), topic, RETRY_INTERVAL_ARR[i]));
Thread.sleep(RETRY_INTERVAL_ARR[i] * 1000);
processStatusEnum = messageHandleService.handleMessage(record);


if (ProcessStatusEnum.FAILED_ERROR != processStatusEnum) {
retries = -1;
storeOffsetInRedis(topic, partition, offset);
break;
}
}
}
// 消息处理失败
LoggerTools.recordError(this.getClass(), "reciveMessage", "message process met error:offset:" + record.offset() + " consumerTopic: " + consumerTopic + " partition: "
+ record.partition());


}


public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {


private KafkaConsumer consumer;


public SaveOffsetsOnRebalance(KafkaConsumer consumer) {
this.consumer = consumer;
}


@Override
public void onPartitionsRevoked(Collection partitions) {
for (TopicPartition partition : partitions) {
LoggerTools.trace(getClass(), "onPartitionsRevoked", "onPartitionsRevoked : " + partition.topic() + " : " + partition.partition());
}
}


@Override
public void onPartitionsAssigned(Collection partitions) {
for (TopicPartition partition : partitions) {
int partitionNumber = partition.partition();
Long offsetFromRedis = getOffsetFromRedis(partition.topic(), partitionNumber);
consumer.seek(partition, offsetFromRedis);
LoggerTools.trace(getClass(), "onPartitionsAssigned", "onPartitionsAssigned : topic : " + partition.topic() + " offsetFromRedis: " + offsetFromRedis
+ " partition: " + partitionNumber);
}
}
}


private boolean storeOffsetInRedis(String topic, int partition, long offset) {
// 判断set集合中的key是否存在,如存在则删除
if (redisService.hexists(topic, String.valueOf(partition))) {
Long hdel = redisService.hdel(topic, String.valueOf(partition));
// 删除key失败
if (hdel == 0) {
LoggerTools.recordError(this.getClass(), "storeOffsetInRedis", "删除redis中的offset key 失败");
return false;
}
}


Long hsetnx = redisService.hsetnx(topic, String.valueOf(partition), String.valueOf(offset));
if (hsetnx == 0) {
return false;
}


return true;
}


private Long getOffsetFromRedis(String topic, int partition) {
String partitionNumber = String.valueOf(partition);
String offset = redisService.hget(topic, partitionNumber);
if (!StringTools.hasText(offset)) {
LoggerTools.trace(this.getClass(), "getOffsetFromRedis", topic + "获取offset值不存在,返回0L");
return 0L;
}
return Long.valueOf(offset);
}


@Override
public void sendMessage(String topic, String key, String message, boolean isAsync) {
KafkaProducerMessage kafkaProducerMessage = new KafkaProducerMessage(topic, key, message, true);
produceExecutorPool.submit(kafkaProducerMessage);
}


}

阅读(790) | 评论(0) | 转发(0) |
0

上一篇:没有了

下一篇:没有了

给主人留下些什么吧!~~