Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3291371
  • 博文数量: 346
  • 博客积分: 10189
  • 博客等级: 上将
  • 技术积分: 3125
  • 用 户 组: 普通用户
  • 注册时间: 2008-08-05 19:46
文章分类

全部博文(346)

文章存档

2013年(35)

2011年(35)

2010年(76)

2009年(48)

2008年(152)

分类: 高性能计算

2013-08-26 16:21:03

转载自[] 
本文链接: 

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。


设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。

kakfa的consumer使用拉的方式工作。

安装kafka
下载:~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。

使用kafka

  1. import java.util.Arrays;  
  2. import java.util.List;  
  3. import java.util.Properties;  
  4. import kafka.javaapi.producer.SyncProducer;  
  5. import kafka.javaapi.message.ByteBufferMessageSet;  
  6. import kafka.message.Message;  
  7. import kafka.producer.SyncProducerConfig;  
  8.   
  9. ...  
  10.   
  11. Properties props = new Properties();  
  12. props.put(“zk.connect”, “127.0.0.1:2181”);  
  13. props.put("serializer.class""kafka.serializer.StringEncoder");  
  14. ProducerConfig config = new ProducerConfig(props);  
  15. Producer producer = new Producer(config);  
  16.   
  17. Send a single message  
  18.   
  19. // The message is sent to a randomly selected partition registered in ZK  
  20. ProducerData data = new ProducerData("test-topic""test-message");  
  21. producer.send(data);      
  22.   
  23. producer.close();  

这样就是一个标准的producer。

consumer的代码

  1. // specify some consumer properties  
  2. Properties props = new Properties();  
  3. props.put("zk.connect""localhost:2181");  
  4. props.put("zk.connectiontimeout.ms""1000000");  
  5. props.put("groupid""test_group");  
  6.   
  7. // Create the connection to the cluster  
  8. ConsumerConfig consumerConfig = new ConsumerConfig(props);  
  9. ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
  10.   
  11. // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
  12. Map>> topicMessageStreams =   
  13.     consumerConnector.createMessageStreams(ImmutableMap.of("test"4));  
  14. List> streams = topicMessageStreams.get("test");  
  15.   
  16. // create list of 4 threads to consume from each of the partitions   
  17. ExecutorService executor = Executors.newFixedThreadPool(4);  
  18.   
  19. // consume the messages in the threads  
  20. for(final KafkaMessageStream stream: streams) {  
  21.   executor.submit(new Runnable() {  
  22.     public void run() {  
  23.       for(Message message: stream) {  
  24.         // process message  
  25.       }   
  26.     }  
  27.   });  
  28. }  

原创文章如转载,请注明:转载自[]
阅读(2122) | 评论(0) | 转发(0) |
0

上一篇:消息系统Kafka介绍

下一篇:kafka 介绍

给主人留下些什么吧!~~