分类: 大数据
2014-04-04 13:27:34
1. Kafka 介绍。
2. Kafka 安装部署介绍。
3. Kafka console一般操作命令。
4. Kafka java code 编程之producer & consumer。
一、 Kafka 介绍
Kafka 来源,是linkedin 用于日志处理的分布式消息队列,linkedin 的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享)以及对系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)。
Kafka存储策略
1. Kafka以topic来进行消息管理,每个topic包含多个partitions,每个partition对应一个逻辑log,由多个segment组成。
2. 每个segment中存储多条消息,消息id由其逻辑位置决定,该id可直接定位到消息存储位置,避免id到位置的额外映射。
3.每个part(patition)在内存中对应一个index,记录每个segment中的第一条消息偏移offset。
4. 发布者发到某个topic的消息会被均匀的分布到多个part上,broker收到发布消息往对应part的最后一个segment上添加消息。当某个segment上的消息数达到配置或发布时间的阀值,会被flush到磁盘。
5.无论消息是否被消费,只要在配置的时间段内,消息都将被留。
Consumer group
1. 允许consumer group 对一个topic进行消费,不同的group之间独立订阅。但同一个group对消息订阅,是有序且有唯一的。
2. 为了减小一个consumer group中不同consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不同的partition。
Zookeeper协调控制
1. 管理broker 与consumer的动态加入与离开。
2. 触发负载均衡,当broker 或consumer加入或离开时,会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。
3. 维护消费关系及每个partition的消费信息。
消息交付保证
1. Kafka 对消息的重复、丢失、错误以及顺序没有严格要求。
2. 对每个partition只会被consumer group内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的订阅。
3. Kafka 为每条消息计算CRC校验,用于错误检测,crc校验不通过的消息会被直接丢弃。
kafka分布式
Log partitions 被分布在多个servers上,根据配置的数量,每个partition被复制到指定数量的servers上用于容错。
每个partition拥有一个leader,其他servers做为追随者。Leader 负责该partition的所有读写操作。如果该leader宕机,其他followers会被自动选举出一个leader,自动接管。
同时,其他followers也在同时做为它们管理partition的leader,这样就真正的达到负载均衡的目的。即多个server同时提供服务。
二、Kafka 安装部署介绍
内置Zookeeper,配置zookeeper.properties和server.properties。(网上和官网上大多是在一个node上部署多个cluster),现分享在多台servers的搭建配置,还是多简单。
zookeeper 配置:
zookeeper.properties中
dataDir=/home/talend/becktest/kafka_2.10-0.8.1/zookeeper //此目录下,必须建myid文件,并配上id数字
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
server.1=100.16.75.25:2888:3888
server.2=100.16.75.26:2888:3888
server.3=100.16.75.27:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
kafka server 配置:server.properties
broker.id=1
port=9092
host.name=100.16.75.25
log.dirs=/talend/becktest/kafka_2.10-0.8.1/kafka-logs
zookeeper.connect=100.16.75.25:2181,100.16.75.26:2181,100.16.75.27:2181 //zookeeper 集群
三、Kafka console一般操作命令
创建topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
显示所有topic:bin/kafka-topics.sh --list --zookeeper localhost:2181 --topic
发消息:bin/kafka-console-producer.sh --broker-list ip:9092 --topic test
消费:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytest
查看topic描述:bin/kafka-topics.sh --describe --zookeeper 100.16.75.25:2181 --topic my-reicated-topic
四、Kafka java code 编程之producer & consumer
Java实现的一个简单demo,用于produce和 consume消息(此处参考他人代码,重在学习分享)。
public class LogProducer {
private Producer
public LogProducer() throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer
}
public void send(String topicName,String message) {
if(topicName == null || message == null){
return;
}
KeyedMessage
inner.send(km);
}
public void send(String topicName,Collection
if(topicName == null || messages == null){
return;
}
if(messages.isEmpty()){
return;
}
List
for(String entry : messages){
KeyedMessage
kms.add(km);
}
inner.send(kms);
}
public void close(){
inner.close();
}
/**
* @param args
*/
public static void main(String[] args) {
LogProducer producer = null;
try{
producer = new LogProducer();
int i=0;
while(i<5){
producer.send("mytest", "this is a sample" + i);
i++;
Thread.sleep(2000);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(producer != null){
producer.close();
}
}
}
}
consumer:
public class LogConsumer {
private ConsumerConfig config;
private String topic;
private int partitionsNum;
private MessageExecutor executor;
private ConsumerConnector connector;
private ExecutorService threadPool;
public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
config = new ConsumerConfig(properties);
this.topic = topic;
this.partitionsNum = partitionsNum;
this.executor = executor;
}
public void start() throws Exception{
connector = Consumer.createJavaConsumerConnector(config);
Map
topics.put(topic, partitionsNum);
Map
List
threadPool = Executors.newFixedThreadPool(partitionsNum);
for(KafkaStream
threadPool.execute(new MessageRunner(partition));
}
}
public void close(){
try{
threadPool.shutdownNow();
}catch(Exception e){
//
}finally{
connector.shutdown();
}
}
class MessageRunner implements Runnable{
private KafkaStream
MessageRunner(KafkaStream
this.partition = partition;
}
public void run(){
ConsumerIterator
while(it.hasNext()){
MessageAndMetadata
System.out.println("partiton:" + item.partition());
System.out.println("offset:" + item.offset());
executor.execute(new String(item.message()));//UTF-8
}
}
}
interface MessageExecutor {
public void execute(String message);
}
/**
* @param args
*/
public static void main(String[] args) {
LogConsumer consumer = null;
try{
MessageExecutor executor = new MessageExecutor() {
public void execute(String message) {
System.out.println(message);
}
};
consumer = new LogConsumer("mytest", 2, executor);
consumer.start();
}catch(Exception e){
e.printStackTrace();
}finally{
// if(consumer != null){
// consumer.close();
// }
}
}
}