第4天Kafka
1.Kafka 终端和后端存储之间的中间件
由Producer,Broker,Consumer,还有zookeeper组成
Producer:向broker发送消息,顺序追加消息
Broker:从producer端接收消息,将消息发送给订阅的consumer,并可靠保存一段时间
Consumer:取消息并处理,顺序读
2.消息组成
topic:切分成1个或多个partition存储在Broker中,是个逻辑上的消息概念
key,
value,
timestamp
在一个Partition中同一个Producer发送的消息是顺序的,分配标示是消息关键字hash取模实现,如果数据要求存到同一个Partition,则选用同样的关键字就行
3.
flume 短时间存在,用完及没
kafka 会缓存一段时间,默认一周
hdfs 永久缓存
相似组件对比:
Kafka hdfs
Broker datanode
partition block
topic file/dir
4.搭建环境:
下载解压Kafka
启动 Zookeeper(集团内部可以共享)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动Kafaka
bin/kafka-server-start.sh -daemon config/server.properties
创建 topic,名字为 test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
启动生产者生产信息,键盘输如测试信息并回车
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动1个终端消费数据,然后观看消费端信息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
5.Java测试
//这个例子不能在windows和虚拟机间通讯
//生产者代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
producer.send(new ProducerRecord("test", "hello"));//test是topic名称
//消费者代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer= new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(topic));//订阅Topic
ConsumerRecords records = consumer.poll(10000);
//处理得到的数据
for (ConsumerRecord record : records) {
System.out.println("topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
}
阅读(1308) | 评论(0) | 转发(0) |