Chinaunix首页 | 论坛 | 博客
  • 博客访问: 150643
  • 博文数量: 78
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 724
  • 用 户 组: 普通用户
  • 注册时间: 2014-04-04 11:31
文章分类

全部博文(78)

文章存档

2015年(26)

2014年(52)

我的朋友

分类: 大数据

2014-04-04 13:27:34

1.    Kafka 介绍。

2.    Kafka 安装部署介绍。

3.    Kafka console一般操作命令。

4.    Kafka java code 编程之producer & consumer。

一、 Kafka 介绍

    Kafka 来源,是linkedin 用于日志处理的分布式消息队列,linkedin 的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享)以及对系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)。

Kafka存储策略        

      1.  Kafkatopic来进行消息管理,每个topic包含多个partitions,每个partition对应一个逻辑log,由多个segment组成。

      2.  每个segment中存储多条消息,消息id由其逻辑位置决定,该id可直接定位到消息存储位置,避免id到位置的额外映射。

      3.每个part(patition)在内存中对应一个index,记录每个segment中的第一条消息偏移offset

      4.   发布者发到某个topic的消息会被均匀的分布到多个part上,broker收到发布消息往对应part的最后一个segment上添加消息。当某个segment上的消息数达到配置或发布时间的阀值,会被flush到磁盘。

     5.无论消息是否被消费,只要在配置的时间段内,消息都将被留。

Consumer group

1.       允许consumer group 对一个topic进行消费,不同的group之间独立订阅。但同一个group对消息订阅,是有序且有唯一的。

2.       为了减小一个consumer group中不同consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不同的partition

Zookeeper协调控制

1.       管理broker consumer的动态加入与离开。

2.       触发负载均衡,当broker consumer加入或离开时,会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。

3.       维护消费关系及每个partition的消费信息。

消息交付保证

1. Kafka 对消息的重复、丢失、错误以及顺序没有严格要求。

2. 对每个partition只会被consumer group内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的订阅。

3. Kafka 为每条消息计算CRC校验,用于错误检测,crc校验不通过的消息会被直接丢弃。

kafka分布式

         Log  partitions 被分布在多个servers上,根据配置的数量,每个partition被复制到指定数量的servers上用于容错。

         每个partition拥有一个leader,其他servers做为追随者。Leader 负责该partition的所有读写操作。如果该leader宕机,其他followers会被自动选举出一个leader,自动接管。

         同时,其他followers也在同时做为它们管理partitionleader,这样就真正的达到负载均衡的目的。即多个server同时提供服务。

二、Kafka 安装部署介绍

内置Zookeeper,配置zookeeper.propertiesserver.properties。(网上和官网上大多是在一个node上部署多个cluster),现分享在多台servers的搭建配置,还是多简单。

zookeeper 配置:

zookeeper.properties中

dataDir=/home/talend/becktest/kafka_2.10-0.8.1/zookeeper          //此目录下,必须建myid文件,并配上id数字
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

server.1=100.16.75.25:2888:3888
server.2=100.16.75.26:2888:3888
server.3=100.16.75.27:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2

 

kafka server 配置:server.properties

broker.id=1

port=9092

host.name=100.16.75.25

log.dirs=/talend/becktest/kafka_2.10-0.8.1/kafka-logs

zookeeper.connect=100.16.75.25:2181,100.16.75.26:2181,100.16.75.27:2181  //zookeeper 集群

三、Kafka console一般操作命令

创建topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

显示所有topic:bin/kafka-topics.sh --list --zookeeper localhost:2181 --topic

发消息:bin/kafka-console-producer.sh --broker-list ip:9092 --topic test
消费:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytest

查看topic描述:bin/kafka-topics.sh --describe --zookeeper 100.16.75.25:2181 --topic my-reicated-topic

四、Kafka java code 编程之producer & consumer

Java实现的一个简单demo,用于produce consume消息(此处参考他人代码,重在学习分享)。

public class LogProducer {

 private Producer inner;
 public LogProducer() throws Exception{
  Properties properties = new Properties();
  properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
  ProducerConfig config = new ProducerConfig(properties);
  inner = new Producer(config);
 }

 
 public void send(String topicName,String message) {
  if(topicName == null || message == null){
   return;
  }
  KeyedMessage km = new KeyedMessage(topicName,message);
  inner.send(km);
 }
 
 public void send(String topicName,Collection messages) {
  if(topicName == null || messages == null){
   return;
  }
  if(messages.isEmpty()){
   return;
  }
  List> kms = new ArrayList>();
  for(String entry : messages){
   KeyedMessage km = new KeyedMessage(topicName,entry);
   kms.add(km);
  }
  inner.send(kms);
 }
 
 public void close(){
  inner.close();
 }
 
 /**
  * @param args
  */
 public static void main(String[] args) {
  LogProducer producer = null;
  try{
   producer = new LogProducer();
   int i=0;
   while(i<5){
    producer.send("mytest", "this is a sample" + i);
    i++;
    Thread.sleep(2000);
   }
  }catch(Exception e){
   e.printStackTrace();
  }finally{
   if(producer != null){
    producer.close();
   }
  }

 }

}

consumer:

public class LogConsumer {

 private ConsumerConfig config;
 private String topic;
 private int partitionsNum;
 private MessageExecutor executor;
 private ConsumerConnector connector;
 private ExecutorService threadPool;
 public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
  Properties properties = new Properties();
  properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
  config = new ConsumerConfig(properties);
  this.topic = topic;
  this.partitionsNum = partitionsNum;
  this.executor = executor;
 }
 
 public void start() throws Exception{
  connector = Consumer.createJavaConsumerConnector(config);
  Map topics = new HashMap();
  topics.put(topic, partitionsNum);
  Map>> streams = connector.createMessageStreams(topics);
  List> partitions = streams.get(topic);
  threadPool = Executors.newFixedThreadPool(partitionsNum);
  for(KafkaStream partition : partitions){
   threadPool.execute(new MessageRunner(partition));
  }
 }

     
 public void close(){
  try{
   threadPool.shutdownNow();
  }catch(Exception e){
   //
  }finally{
   connector.shutdown();
  }
  
 }
 
 class MessageRunner implements Runnable{
  private KafkaStream partition;
  
  MessageRunner(KafkaStream partition) {
   this.partition = partition;
  }
  
  public void run(){
   ConsumerIterator it = partition.iterator();
   while(it.hasNext()){
    MessageAndMetadata item = it.next();
    System.out.println("partiton:" + item.partition());
    System.out.println("offset:" + item.offset());
    executor.execute(new String(item.message()));//UTF-8
   }
  }
 }
 
 interface MessageExecutor {
  
  public void execute(String message);
 }
 
 /**
  * @param args
  */
 public static void main(String[] args) {
  LogConsumer consumer = null;
  try{
   MessageExecutor executor = new MessageExecutor() {
    
    public void execute(String message) {
     System.out.println(message);
     
    }
   };
   consumer = new LogConsumer("mytest", 2, executor);
   consumer.start();
  }catch(Exception e){
   e.printStackTrace();
  }finally{
//   if(consumer != null){
//    consumer.close();
//   }
  }

 }

}

阅读(3220) | 评论(1) | 转发(0) |
0

上一篇:spark学习

下一篇:经典解说

给主人留下些什么吧!~~

7大爷2014-04-08 13:50:20

学习了