Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3547600
  • 博文数量: 864
  • 博客积分: 14125
  • 博客等级: 上将
  • 技术积分: 10634
  • 用 户 组: 普通用户
  • 注册时间: 2007-07-27 16:53
个人简介

https://github.com/zytc2009/BigTeam_learning

文章分类

全部博文(864)

文章存档

2023年(1)

2021年(1)

2019年(3)

2018年(1)

2017年(10)

2015年(3)

2014年(8)

2013年(3)

2012年(69)

2011年(103)

2010年(357)

2009年(283)

2008年(22)

分类: 大数据

2017-09-12 20:08:51

第4天Kafka

1.Kafka 终端和后端存储之间的中间件
  由Producer,Broker,Consumer,还有zookeeper组成
  Producer:向broker发送消息,顺序追加消息
  Broker:从producer端接收消息,将消息发送给订阅的consumer,并可靠保存一段时间
  Consumer:取消息并处理,顺序读

2.消息组成
  topic:切分成1个或多个partition存储在Broker中,是个逻辑上的消息概念
  key,
  value,
  timestamp

  在一个Partition中同一个Producer发送的消息是顺序的,分配标示是消息关键字hash取模实现,如果数据要求存到同一个Partition,则选用同样的关键字就行

3.
  flume 短时间存在,用完及没
  kafka 会缓存一段时间,默认一周
  hdfs 永久缓存

  相似组件对比:
  Kafka       hdfs
  
  Broker      datanode
  partition   block
  topic       file/dir
  
4.搭建环境:
  下载解压Kafka
  启动 Zookeeper(集团内部可以共享)

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 

  启动Kafaka
    
    bin/kafka-server-start.sh -daemon config/server.properties 

  创建 topic,名字为 test

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic test 
 
  启动生产者生产信息,键盘输如测试信息并回车

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

  启动1个终端消费数据,然后观看消费端信息

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

5.Java测试
    //这个例子不能在windows和虚拟机间通讯

    //生产者代码
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
    Producer producer = new KafkaProducer(props);
    producer.send(new ProducerRecord("test", "hello"));//test是topic名称

    //消费者代码
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "testGroup");
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
    KafkaConsumer consumer= new KafkaConsumer(props);
    consumer.subscribe(Arrays.asList(topic));//订阅Topic

    ConsumerRecords records = consumer.poll(10000);
    //处理得到的数据
    for (ConsumerRecord record : records) {
          System.out.println("topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
    }
阅读(1308) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~