Chinaunix首页 | 论坛 | 博客
  • 博客访问: 8692
  • 博文数量: 3
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 41
  • 用 户 组: 普通用户
  • 注册时间: 2017-04-05 00:08
文章分类
文章存档

2017年(3)

我的朋友
最近访客

分类: 大数据

2017-09-12 21:21:18

package com.xx.bigdata.kafkaTool;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties() {{
            put("bootstrap.servers", "zk1:9092,zk2:9092,zk3:9092");
            put("group.id", "testConsumer");
            put("enable.auto.commit", "true");
            put("auto.commit.interval.ms", "1000");
            put("session.timeout.ms", "30000");
            put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }};
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (ConsumerRecord record : records) {
                System.out.printf("topic = %s, offset = %d, key = %s, value = %s",record.topic(), record.offset(), record.key(), record.value());
            }
        }
    }
}

参考:http://blog.csdn.net/gpwner/article/details/74517360
http://www.cnblogs.com/gaopeng527/p/4950232.html
阅读(990) | 评论(0) | 转发(0) |
0

上一篇:kafka0.10 生产者案例

下一篇:没有了

给主人留下些什么吧!~~