本文Window平台实现Kafka的生产者消费者Java版本示例,前提是已经安装好Kafka环境并启动zkserver和kafka
1)消费者代码实现
(特别注意的是:文件命名为 KafkaConsumer.java 与消费者类名相同)
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Properties;
-
-
import kafka.consumer.ConsumerConfig;
-
import kafka.consumer.ConsumerIterator;
-
import kafka.consumer.KafkaStream;
-
import kafka.javaapi.consumer.ConsumerConnector;
-
import kafka.serializer.StringDecoder;
-
import kafka.utils.VerifiableProperties;
-
-
//import com.lin.demo.producer.KafkaProducer;
-
-
-
public class KafkaConsumer{
-
-
private final ConsumerConnector consumer;
-
public final static String TOPIC = "linlin";
-
-
private KafkaConsumer() {
-
Properties props = new Properties();
-
// zookeeper 配置
-
props.put("zookeeper.connect", "127.0.0.1:2181");
-
-
// group 代表一个消费组
-
props.put("group.id", "lingroup");
-
-
// zk连接超时
-
props.put("zookeeper.session.timeout.ms", "4000");
-
props.put("zookeeper.sync.time.ms", "200");
-
props.put("rebalance.max.retries", "5");
-
props.put("rebalance.backoff.ms", "1200");
-
-
-
props.put("auto.commit.interval.ms", "1000");
-
props.put("auto.offset.reset", "smallest");
-
// 序列化类
-
props.put("serializer.class", "kafka.serializer.StringEncoder");
-
-
ConsumerConfig config = new ConsumerConfig(props);
-
-
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
-
}
-
-
void consume() {
-
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-
topicCountMap.put(TOPIC, new Integer(1));
-
-
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
-
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
-
-
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
-
KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
-
ConsumerIterator<String, String> it = stream.iterator();
-
while (it.hasNext())
-
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + it.next().message() + "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
-
}
-
-
public static void main(String[] args) {
-
new KafkaConsumer().consume();
-
}
-
}
2)生产者代码实现(特别注意的是:文件命名为
KafkaProducerT.java 与生产者类名相同)
-
//package com.lin.demo.producer;
-
-
import java.util.Properties;
-
-
import kafka.javaapi.producer.Producer;
-
import kafka.producer.KeyedMessage;
-
import kafka.producer.ProducerConfig;
-
-
public class KafkaProducerT{
-
private final Producer<String, String> producer;
-
public final static String TOPIC = "linlin";
-
-
private KafkaProducerT() {
-
Properties props = new Properties();
-
// 此处配置的是kafka的端口
-
props.put("metadata.broker.list", "127.0.0.1:9092");
-
props.put("zk.connect", "127.0.0.1:2181");
-
-
// 配置value的序列化类
-
props.put("serializer.class", "kafka.serializer.StringEncoder");
-
// 配置key的序列化类
-
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
-
-
props.put("request.required.acks", "-1");
-
-
producer = new Producer<String, String>(new ProducerConfig(props));
-
}
-
-
void produce() {
-
int messageNo = 1000;
-
final int COUNT = 10000;
-
-
while (messageNo < COUNT) {
-
String key = String.valueOf(messageNo);
-
String data = "hello kafka message " + key;
-
producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
-
System.out.println(data);
-
messageNo++;
-
}
-
}
-
-
public static void main(String[] args) {
-
new KafkaProducerT().produce();
-
//System.out.println("你好 KafkaProducer");
-
}
-
}
3)编译并运行
将生产者(
KafkaProducerT.java)与消费者(
KafkaConsumer.java )代码放在
C:\kafka_2.12-1.0.0 下,这样放置程序文件方便引用
a)编译和运行
消费者
C:\kafka_2.12-1.0.0>javac KafkaConsumer.java -cp ./libs/*
C:\kafka_2.12-1.0.0>java -cp .;./libs/* KafkaConsumer
b)编译生产者
C:\kafka_2.12-1.0.0>javac KafkaProducerT.java -cp ./libs/*
C:\kafka_2.12-1.0.0>java -cp .;./libs/* KafkaProducerT
阅读(1614) | 评论(0) | 转发(0) |