Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1663122
  • 博文数量: 631
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 3920
  • 用 户 组: 普通用户
  • 注册时间: 2014-08-06 21:58
个人简介

博客是我工作的好帮手,遇到困难就来博客找资料

文章分类

全部博文(631)

文章存档

2022年(2)

2021年(4)

2020年(40)

2019年(4)

2018年(78)

2017年(213)

2016年(41)

2015年(183)

2014年(66)

我的朋友

分类: 系统运维

2018-04-25 14:29:00

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。

Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。


Apache Kafka与传统消息系统相比,有以下不同:


它被设计为一个分布式系统,易于向外扩展;
它同时为发布和订阅提供高吞吐量;
它支持多订阅者,当失败时能自动平衡消费者;
它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

安装 kafka
下载地址:

wget

解压:
tar -zxvf kafka_2.11-1.0.0.tgz

cd /usr/local/kafka_2.11-1.0.0/
修改 kafka-server 的配置文件

vim /usr/local/kafka/config/server.properties

修改其中的:

broker.id=1

log.dir=/data/kafka/logs-1

功能验证:

1、启动 zk

使用安装包中的脚本启动单节点 Zookeeper 实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

2、启动Kafka 服务

使用 kafka-server-start.sh 启动 kafka 服务:

bin/kafka-server-start.sh config/server.properties
mark

3、创建 topic

使用 kafka-topics.sh 创建单分区单副本的 topic test:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看 topic 列表:

bin/kafka-topics.sh --list --zookeeper localhost:2181

查询创建的 topic 列表报错:
mark

解决方法:

vim /etc/hosts

将 host 里的

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
修改为:

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         ip6-localhost ip6-localhost.localdomain localhost6 localhost6.localdomain6
方法参考:zookeeper unable to open socket to localhost/0:0:0:0:0:0:0:1:2181

再次查询就不报错了。

4、产生消息

使用 kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
mark

5、消费消息

使用 kafka-console-consumer.sh 接收消息并在终端打印:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
打开个新的命令窗口执行上面命令即可查看信息:
mark

6、查看描述 topics 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
结果:

Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
mark

第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。

“Leader”: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。

“Replicas”: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。

“Isr”: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。

集群配置

Kafka 支持两种模式的集群搭建:可以在单机上运行多个 broker 实例来实现集群,也可在多台机器上搭建集群,下面介绍下如何实现单机多 broker 实例集群,其实很简单,只需要如下配置即可。

单机多broker 集群配置

利用单节点部署多个 broker。 不同的 broker 设置不同的 id,监听端口及日志目录。 例如:

cp config/server.properties config/server-2.properties

cp config/server.properties config/server-3.properties

vim config/server-2.properties

vim config/server-3.properties

修改 :
broker.id=2
listeners = PLAINTEXT://your.host.name:9093
log.dir=/data/kafka/logs-2


broker.id=3
listeners = PLAINTEXT://your.host.name:9094
log.dir=/data/kafka/logs-3

启动Kafka服务:

bin/kafka-server-start.sh config/server-2.properties &

bin/kafka-server-start.sh config/server-3.properties &

至此,单机多broker实例的集群配置完毕。


多机多 broker 集群配置

分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。

假设三台机器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137

分别配置多个机器上的 Kafka 服务,设置不同的 broker id,zookeeper.connect 设置如下:

vim config/server.properties

里面的 zookeeper.connect

修改为:

zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181

使用 Kafka Connect 来导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但您可能想要使用其他来源的数据或将数据从 Kafka 导出到其他系统。对于许多系统,您可以使用 Kafka Connect 来导入或导出数据,而不必编写自定义集成代码。


Kafka Connect 是 Kafka 包含的一个工具,可以将数据导入和导出到 Kafka。它是一个可扩展的工具,运行 连接器,实现与外部系统交互的自定义逻辑。在这个快速入门中,我们将看到如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka topic,并将数据从 Kafka topic 导出到文件。


首先,我们将通过创建一些种子数据开始测试:


echo -e "zhisheng\ntian" > test.txt

mark


接下来,我们将启动两个以独立模式运行的连接器,这意味着它们将在单个本地专用进程中运行。我们提供三个配置文件作为参数。

首先是 Kafka Connect 过程的配置,包含常见的配置,例如要连接的 Kafka 代理以及数据的序列化格式。

其余的配置文件都指定一个要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

bin/connect-standalone.sh   config/connect-standalone.properties   config/connect-file-source.properties   config/connect-file-sink.properties

Kafka 附带的这些示例配置文件使用您之前启动的默认本地群集配置,并创建两个连接器:

第一个是源连接器,用于读取输入文件中的行,并将每个连接生成为 Kafka topic,

第二个为连接器它从 Kafka topic 读取消息,并在输出文件中产生每行消息。


在启动过程中,您会看到一些日志消息,其中一些指示连接器正在实例化。

Kafka Connect 进程启动后,源连接器应该开始读取 test.txt topic connect-test,并将其生成 topic ,并且接收器连接器应该开始读取 topic 中的消息 connect-test 并将其写入文件 test.sink.txt。

我们可以通过检查输出文件的内容来验证通过整个管道传输的数据:

mark



数据存储在 Kafka topic 中 connect-test,因此我们也可以运行控制台使用者来查看 topic 中的数据


bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

mark

连接器继续处理数据,所以我们可以将数据添加到文件中,并看到它在管道中移动:


echo zhishengtian>> test.txt
echo zhishengtian2>> test.txt
echo zhishengtian3>> test.txt
echo zhishengtian4>> test.txt
mark
mark
使用 Kafka 流来处理数据

Kafka Streams 是用于构建关键任务实时应用程序和微服务的客户端库,输入和/或输出数据存储在 Kafka 集群中。
Kafka Streams 结合了在客户端编写和部署标准 Java 和 Scala 应用程序的简单性以及 Kafka 服务器端集群技术的优势,使这些应用程序具有高度可伸缩性,弹性,容错性,分布式等特性。

转自  



下载

地址:

这里下载的是kafka_2.11-0.11.0.1.tgz

解压

tar -xzf kafka_2.11-0.11.0.1.tgz
cd kafka_2.11-0.11.0.1

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

默认使用的2181端口,可在配置文件修改。

启动kafka server

bin/kafka-server-start.sh config/server.properties

(非本地生产者和消费者访问Kafka,记得修改 config/server.properties中的listeners, 例如
listeners=PLAINTEXT://192.168.33.152:9092)

create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

好像没有输出

启动生产者

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

输入

This is a message
This is another message

启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果无法启动,根据提示,可能需要加zookeeper

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic test --from-beginning


1,查看kafka topic列表,使用--list参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

__consumer_offsets
lx_test_topic
test

2,查看kafka特定topic的详情,使用--topic与--describe参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe


Topic:lx_test_topic     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: lx_test_topic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

列出了lx_test_topic的parition数量、replica因子以及每个partition的leader、replica信息

3,查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list

lx_test

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

console-consumer-86845
console-consumer-11967


4,查看特定consumer group 详情,使用--group与--describe参数

同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe


GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
lx_test                        lx_test_topic             0          465             465             0               kafka-python-1.3.1_/127.0.0.1

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-11967 --describe


GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.
console-consumer-11967         lx_test_topic             0          unknown         465             unknown         console-consumer-11967_aws-lx-1513787888172-d3a91f05-0


其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。 


##查看topic分布情况kafka-list-topic.sh

bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)

bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分区情况)


其实kafka-list-topic.sh里面就一句 
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
实际是通过 kafka-run-class.sh脚本执行的包kafka.admin下面的类


##创建TOPIC kafka-create-topic.sh
bin/kafka-create-topic.sh   --replica 2 --partition 8 --topic test  --zookeeper 192.168.197.170:2181,192.168.197.171:2181

创建名为test的topic, 8个分区分别存放数据,数据备份总共2份


bin/kafka-create-topic.sh   --replica 1 --partition 1 --topic test2  --zookeeper 192.168.197.170:2181,192.168.197.171:2181

结果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170

##重新分配分区kafka-reassign-partitions.sh

这个命令可以分区指定到想要的--broker-list上

bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute 

cat topic-to-move.json
{"topics":
     [{"topic": "test2"}],
     "version":1
}

##为Topic增加 partition数目kafka-add-partitions.sh

bin/kafka-add-partitions.sh --topic test --partition 2  --zookeeper  192.168.197.170:2181,192.168.197.171:2181 (为topic test增加2个分区)


##控制台接收消息
bin/kafka-console-consumer.sh --zookeeper  192.168.197.170:2181,192.168.197.171:2181  --from-beginning --topic test


##控制台发送消息
bin/kafka-console-producer.sh --broker-list  192.168.197.170:9092,192.168.197.171: 9092    --topic test 


##手动均衡topic, kafka-preferred-replica-election.sh

bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json


cat preferred-click.json
{
 "partitions":
  [
    {"topic": "click", "partition": 0},
    {"topic": "click", "partition": 1},
    {"topic": "click", "partition": 2},
    {"topic": "click", "partition": 3},
    {"topic": "click", "partition": 4},
    {"topic": "click", "partition": 5},
    {"topic": "click", "partition": 6},
    {"topic": "click", "partition": 7},
    {"topic": "play", "partition": 0},
    {"topic": "play", "partition": 1},
    {"topic": "play", "partition": 2},
    {"topic": "play", "partition": 3},
    {"topic": "play", "partition": 4},
    {"topic": "play", "partition": 5},
    {"topic": "play", "partition": 6},
    {"topic": "play", "partition": 7}
  ]
}

##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181


1、启动kafka


./bin/kafka-server-start.sh -daemon config/server.properties &


2、创建topic–test


./bin/kafka-topics.sh --create --zookeeper 10.10.67.102:2181, 10.10.67.104:2181, 10.10.67.106:2181 --replication-factor 3 --partitions 3 --topic test


3、列出已创建的topic列表


./bin/kafka-topics.sh --list --zookeeper localhost:2181


4、模拟客户端去发送消息


./bin/kafka-console-producer.sh --broker-list 10.10.67.102:9092, 10.10.67.104:9092, 10.10.67.106:9092 --topic test

5、模拟客户端去接受消息

./bin/kafka-console-consumer.sh --zookeeper 10.10.67.102:2181, 10.10.67.104:2181, 10.10.67.106:2181 --from-beginning --topic test

6、查看指定的主题

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test





阅读(1189) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~