Chinaunix首页 | 论坛 | 博客
  • 博客访问: 321052
  • 博文数量: 168
  • 博客积分: 60
  • 博客等级: 民兵
  • 技术积分: 348
  • 用 户 组: 普通用户
  • 注册时间: 2009-03-22 18:25
文章分类

全部博文(168)

文章存档

2018年(1)

2017年(2)

2016年(26)

2015年(31)

2014年(41)

2013年(65)

2012年(2)

分类: Java

2018-03-28 14:42:57

本文Window平台实现Kafka的生产者消费者Java版本示例,前提是已经安装好Kafka环境并启动zkserver和kafka

1)消费者代码实现(特别注意的是:文件命名为 KafkaConsumer.java  与消费者类名相同

点击(此处)折叠或打开

  1. import java.util.HashMap;
  2. import java.util.List;
  3. import java.util.Map;
  4. import java.util.Properties;
  5.   
  6. import kafka.consumer.ConsumerConfig;
  7. import kafka.consumer.ConsumerIterator;
  8. import kafka.consumer.KafkaStream;
  9. import kafka.javaapi.consumer.ConsumerConnector;
  10. import kafka.serializer.StringDecoder;
  11. import kafka.utils.VerifiableProperties;
  12.   
  13. //import com.lin.demo.producer.KafkaProducer;

  14.   
  15. public class KafkaConsumer{
  16.   
  17.     private final ConsumerConnector consumer;
  18.     public final static String TOPIC = "linlin";
  19.   
  20.     private KafkaConsumer() {
  21.         Properties props = new Properties();
  22.         // zookeeper 配置
  23.         props.put("zookeeper.connect", "127.0.0.1:2181");
  24.   
  25.         // group 代表一个消费组
  26.         props.put("group.id", "lingroup");
  27.   
  28.         // zk连接超时
  29.         props.put("zookeeper.session.timeout.ms", "4000");
  30.         props.put("zookeeper.sync.time.ms", "200");
  31.         props.put("rebalance.max.retries", "5");
  32.         props.put("rebalance.backoff.ms", "1200");
  33.           
  34.       
  35.         props.put("auto.commit.interval.ms", "1000");
  36.         props.put("auto.offset.reset", "smallest");
  37.         // 序列化类
  38.         props.put("serializer.class", "kafka.serializer.StringEncoder");
  39.   
  40.         ConsumerConfig config = new ConsumerConfig(props);
  41.   
  42.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
  43.     }
  44.   
  45.     void consume() {
  46.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  47.         topicCountMap.put(TOPIC, new Integer(1));
  48.   
  49.         StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
  50.         StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
  51.   
  52.         Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
  53.         KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
  54.         ConsumerIterator<String, String> it = stream.iterator();
  55.         while (it.hasNext())
  56.             System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + it.next().message() + "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
  57.     }
  58.   
  59.     public static void main(String[] args) {
  60.         new KafkaConsumer().consume();
  61.     }
  62. }
2)生产者代码实现(特别注意的是:文件命名为 KafkaProducerT.java  与生产者类名相同)

点击(此处)折叠或打开

  1. //package com.lin.demo.producer;
  2.   
  3. import java.util.Properties;
  4.   
  5. import kafka.javaapi.producer.Producer;
  6. import kafka.producer.KeyedMessage;
  7. import kafka.producer.ProducerConfig;
  8.   
  9. public class KafkaProducerT{
  10.     private final Producer<String, String> producer;
  11.     public final static String TOPIC = "linlin";
  12.   
  13.     private KafkaProducerT() {
  14.         Properties props = new Properties();
  15.         // 此处配置的是kafka的端口
  16.         props.put("metadata.broker.list", "127.0.0.1:9092");
  17.         props.put("zk.connect", "127.0.0.1:2181");
  18.   
  19.         // 配置value的序列化类
  20.         props.put("serializer.class", "kafka.serializer.StringEncoder");
  21.         // 配置key的序列化类
  22.         props.put("key.serializer.class", "kafka.serializer.StringEncoder");
  23.   
  24.         props.put("request.required.acks", "-1");
  25.   
  26.         producer = new Producer<String, String>(new ProducerConfig(props));
  27.     }
  28.   
  29.     void produce() {
  30.         int messageNo = 1000;
  31.         final int COUNT = 10000;
  32.   
  33.         while (messageNo < COUNT) {
  34.             String key = String.valueOf(messageNo);
  35.             String data = "hello kafka message " + key;
  36.             producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
  37.             System.out.println(data);
  38.             messageNo++;
  39.         }
  40.     }
  41.   
  42.     public static void main(String[] args) {
  43.         new KafkaProducerT().produce();
  44.         //System.out.println("你好 KafkaProducer");
  45.     }
  46. }
3)编译并运行
将生产者(KafkaProducerT.java)与消费者(KafkaConsumer.java )代码放在C:\kafka_2.12-1.0.0 下,这样放置程序文件方便引用
a)编译和运行消费者
C:\kafka_2.12-1.0.0>javac KafkaConsumer.java -cp ./libs/*
C:\kafka_2.12-1.0.0>java -cp .;./libs/*  KafkaConsumer

b)编译生产者
C:\kafka_2.12-1.0.0>javac KafkaProducerT.java -cp ./libs/*
C:\kafka_2.12-1.0.0>java -cp .;./libs/* KafkaProducerT

阅读(60) | 评论(0) | 转发(0) |
0

上一篇:AIX查看端口对应的进程ID

下一篇:没有了

给主人留下些什么吧!~~
评论热议
请登录后评论。

登录 注册