package com.study.biz.kafka.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.
study.biz.constant.ProcessStatusEnum;
import com.
study.biz.constant.TopicEnum;
import com.
study.integration.redis.RedisService;
import com.
study.integration.tools.CustomPartitioner;
import com.
study.integration.tools.KafkaProducerMessage;
import com.
study.integration.tools.KafkaProducerSingleton;
import com.
study.integration.tools.LoggerTools;
import com.
study.integration.tools.StringTools;
@Service("KafkaService")
public class KafkaServiceImpl implements KafkaService {
// 接收消息配置
private Properties consumeProps = null;
// 发送消息配置
private Properties produceProps = null;
@Value("${bootstrap.servers.config}")
private String BOOTSTRAP_SERVERS_CONFIG;
@Value("${group.id}")
private String GROUP_ID;
@Value("${zookeeper.session.timeout.ms}")
private String SESSION_TIMEOUT_MS_CONFIG;
@Value("${auto.commit.enable}")
private String ENABLE_AUTO_COMMIT;
@Value("${max.poll.records}")
private String MAX_POLL_RECORDS;
@Value("${reconnect.backoff.ms}")
private String RECONNECT_BACKOFF_MS;
@Value("${retry.backoff.ms}")
private String RETRY_BACKOFF_MS;
@Autowired
private MessageHandleService messageHandleService;
@Autowired
private RedisService redisService;
private final List consumers = new ArrayList<>();
private ExecutorService consumerExecutorPool;
private ExecutorService produceExecutorPool;
private final static long DEFAULT_MAX_RETRIES = 4;
private final static Long[] RETRY_INTERVAL_ARR = { 5L, 20L, 40L, 60L };
@PostConstruct
public void init() {
consumeProps = new Properties();
consumeProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumeProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumeProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG);
consumeProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumeProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumeProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
consumeProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
consumeProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS);
consumeProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
consumerExecutorPool = Executors.newFixedThreadPool(100);
produceProps = new Properties();
produceProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);// broker
produceProps.put(ProducerConfig.ACKS_CONFIG, "1");
produceProps.put(ProducerConfig.LINGER_MS_CONFIG, 100);
produceProps.put(ProducerConfig.RETRIES_CONFIG, 3);
produceProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// key
produceProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value
produceProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getCanonicalName());// 自定义分区函数
KafkaProducerSingleton.setProperties(produceProps);
produceExecutorPool = Executors.newFixedThreadPool(15);
}
/*
* 接收kafka消息
*/
@Override
public void consumeMessage() {
TopicEnum[] topicEnums = TopicEnum.values();
for (TopicEnum topicEnum : topicEnums) {
String topic = topicEnum.getValue().trim();
ConsumerProcesser consumerProcesser = new ConsumerProcesser(topic);
consumers.add(consumerProcesser);
consumerExecutorPool.submit(consumerProcesser);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LoggerTools.trace(this.getClass(), "Runtime.getRuntime", "addShutdownHook,开始执行了....");
try {
for (ConsumerProcesser consumer : consumers) {
consumer.shutdown();
}
consumerExecutorPool.shutdown();
consumerExecutorPool.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LoggerTools.error(this.getClass(), "Runtime.getRuntime", "addShutdownHook,抛出异常", e);
}
}
});
}
public class ConsumerProcesser implements Runnable {
private String topic;
private KafkaConsumer consumer;
public ConsumerProcesser(String topic) {
this.topic = topic;
consumer = new KafkaConsumer<>(consumeProps);
}
@Override
public void run() {
LoggerTools.trace(this.getClass(), "订阅topic:" + topic + "成功");
reciveMessage(topic);
}
private void reciveMessage(String consumerTopic) {
consumer.subscribe(Arrays.asList(consumerTopic), new SaveOffsetsOnRebalance(consumer));
consumer.poll(0);
boolean exists = redisService.exists(consumerTopic);
if (exists) {
for (TopicPartition partition : consumer.assignment()) {
Long offsetFromRedis = getOffsetFromRedis(partition.topic(), partition.partition());
consumer.seek(partition, offsetFromRedis);
}
} else {
consumer.seekToEnd(consumer.assignment());
}
do {
try {
recordLoop(consumerTopic);
} catch (Exception e) {
LoggerTools.error(this.getClass(), "reciveMessage", "处理异常", e);
}
} while (1 != 2);
}
private void recordLoop(String consumerTopic) throws Exception {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
Long offsetFromRedis = getOffsetFromRedis(topic, record.partition());
if (String.valueOf(offset).equals(String.valueOf(offsetFromRedis)) && offset != 0L) {
continue;
}
ProcessStatusEnum processStatusEnum = messageHandleService.handleMessage(record);
// 消息处理成功
if (ProcessStatusEnum.FAILED_ERROR != processStatusEnum) {
storeOffsetInRedis(topic, partition, offset);
continue;
}
handleMessage(topic, record, partition, offset, consumerTopic);
// TopicPartition topicPartition = new TopicPartition(topic,
// partition);
//
// //临时方案,否则数据失败后连续重试
// consumer.seek(topicPartition, offset);
}
}
public void shutdown() {
if (consumer != null) {
consumer.wakeup();
}
}
}
// 抽出方法降低层数复杂度
private void handleMessage(String topic, ConsumerRecord record, int partition, long offset, String consumerTopic) throws Exception {
ProcessStatusEnum processStatusEnum = null;
long retries = DEFAULT_MAX_RETRIES;
for (int i = 0; i < retries; i++) {
if (retries != -1) {
LoggerTools.recordError(this.getClass(), "recordLoop",
String.format("[开始重试] %s times, for consuming topic:%s, sleeping %s seconds", (i + 1), topic, RETRY_INTERVAL_ARR[i]));
Thread.sleep(RETRY_INTERVAL_ARR[i] * 1000);
processStatusEnum = messageHandleService.handleMessage(record);
if (ProcessStatusEnum.FAILED_ERROR != processStatusEnum) {
retries = -1;
storeOffsetInRedis(topic, partition, offset);
break;
}
}
}
// 消息处理失败
LoggerTools.recordError(this.getClass(), "reciveMessage", "message process met error:offset:" + record.offset() + " consumerTopic: " + consumerTopic + " partition: "
+ record.partition());
}
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
public SaveOffsetsOnRebalance(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection partitions) {
for (TopicPartition partition : partitions) {
LoggerTools.trace(getClass(), "onPartitionsRevoked", "onPartitionsRevoked : " + partition.topic() + " : " + partition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection partitions) {
for (TopicPartition partition : partitions) {
int partitionNumber = partition.partition();
Long offsetFromRedis = getOffsetFromRedis(partition.topic(), partitionNumber);
consumer.seek(partition, offsetFromRedis);
LoggerTools.trace(getClass(), "onPartitionsAssigned", "onPartitionsAssigned : topic : " + partition.topic() + " offsetFromRedis: " + offsetFromRedis
+ " partition: " + partitionNumber);
}
}
}
private boolean storeOffsetInRedis(String topic, int partition, long offset) {
// 判断set集合中的key是否存在,如存在则删除
if (redisService.hexists(topic, String.valueOf(partition))) {
Long hdel = redisService.hdel(topic, String.valueOf(partition));
// 删除key失败
if (hdel == 0) {
LoggerTools.recordError(this.getClass(), "storeOffsetInRedis", "删除redis中的offset key 失败");
return false;
}
}
Long hsetnx = redisService.hsetnx(topic, String.valueOf(partition), String.valueOf(offset));
if (hsetnx == 0) {
return false;
}
return true;
}
private Long getOffsetFromRedis(String topic, int partition) {
String partitionNumber = String.valueOf(partition);
String offset = redisService.hget(topic, partitionNumber);
if (!StringTools.hasText(offset)) {
LoggerTools.trace(this.getClass(), "getOffsetFromRedis", topic + "获取offset值不存在,返回0L");
return 0L;
}
return Long.valueOf(offset);
}
@Override
public void sendMessage(String topic, String key, String message, boolean isAsync) {
KafkaProducerMessage kafkaProducerMessage = new KafkaProducerMessage(topic, key, message, true);
produceExecutorPool.submit(kafkaProducerMessage);
}
}