全部博文(788)
分类: 敏捷开发
2017-10-14 22:28:10
使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!
org.apache.kafka kafka-clients 0.10.2.0
JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。
TestBase.java
public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = "192.168.60.160:9092" ; protected String topic = "zlikun_topic"; }
ProducerTest.java
public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException { KafkaProducerproducer = new KafkaProducer<>(props); // 发送消息 for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord (topic, Integer.toString(i), Integer.toString(i)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition()); } else { log.error("send error !" ,e); } } }); } TimeUnit.SECONDS.sleep(3); producer.close(); } }
ConsumerTest.java
public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server); props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ; props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() { Consumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); // consumer.assign(Arrays.asList(new TopicPartition(topic, 1))); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
# 测试topic为手动创建 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic
控制台输出信息
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time