转载自[]
本文链接:
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。
kakfa的consumer使用拉的方式工作。
安装kafka
下载:~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。
使用kafka
-
import java.util.Arrays;
-
import java.util.List;
-
import java.util.Properties;
-
import kafka.javaapi.producer.SyncProducer;
-
import kafka.javaapi.message.ByteBufferMessageSet;
-
import kafka.message.Message;
-
import kafka.producer.SyncProducerConfig;
-
-
...
-
-
Properties props = new Properties();
-
props.put(“zk.connect”, “127.0.0.1:2181”);
-
props.put("serializer.class", "kafka.serializer.StringEncoder");
-
ProducerConfig config = new ProducerConfig(props);
-
Producer producer = new Producer(config);
-
-
Send a single message
-
-
-
ProducerData data = new ProducerData("test-topic", "test-message");
-
producer.send(data);
-
-
producer.close();
这样就是一个标准的producer。
consumer的代码
-
-
Properties props = new Properties();
-
props.put("zk.connect", "localhost:2181");
-
props.put("zk.connectiontimeout.ms", "1000000");
-
props.put("groupid", "test_group");
-
-
-
ConsumerConfig consumerConfig = new ConsumerConfig(props);
-
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
-
-
-
Map>> topicMessageStreams =
-
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
-
List> streams = topicMessageStreams.get("test");
-
-
-
ExecutorService executor = Executors.newFixedThreadPool(4);
-
-
-
for(final KafkaMessageStream stream: streams) {
-
executor.submit(new Runnable() {
-
public void run() {
-
for(Message message: stream) {
-
-
}
-
}
-
});
-
}
原创文章如转载,请注明:转载自[]
阅读(2122) | 评论(0) | 转发(0) |