全部博文(51)
分类: 大数据
2017-12-04 10:46:42
只要配置zookeeper.connect为要加入的集群,再启动Kafka进程,就可以让新的机器加入到Kafka集群。但是新的机器只针对新的Topic才会起作用,在之前就已经存在的Topic的分区,不会自动的分配到新增加的物理机中。为了使新增加的机器可以分担系统压力,必须进行消息数据迁移。Kafka提供了kafka-reassign-partitions.sh进行数据迁移。
#实例
#准备工作
broker id 1,2 把reqinfo 中数据迁移到broker 0中
[@s1540.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# sh bin/kafka-topics.sh --describe --topic reqinfo --zookeeper 10.16.58.131:2181Topic:reqinfo PartitionCount:2 ReplicationFactor:1 Configs:
Topic: reqinfo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: reqinfo Partition: 1 Leader: 2 Replicas: 2 Isr: 2
1) 验证kafka节点是否正常加入集群。
zkCli.sh > ls /kafka/brokers/ids/
2)检查新增机器的zk信息中主机名是否准确,每个broker都要检查
zkCli.sh > get /kafka/brokers/ids/1
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# cat topics-to-move.json
{
"topics": [
{"topic": "reqinfo"}
],
"version": 1
}
注:
单个topic 的json格式
多个格式:多个用逗号隔开
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# cat topics-to-move.json
{
"topics": [
{"topic": "foo1"},
{"topic": "foo2"}
],
"version":1
}
调用--generate生成迁移计划,将reqinfo扩充到broker-id为 0上(这里可根据自己的实际情况定义)
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# ./bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "0" --generate --zookeeper 10.16.58.131:2181
Current partition replica assignment
{"version":1,"partitions":[{"topic":"reqinfo","partition":1,"replicas":[2]},{"topic":"reqinfo","partition":0,"replicas":[1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"reqinfo","partition":1,"replicas":[0]},{"topic":"reqinfo","partition":0,"replicas":[0]}]}
生成类似于下方的结果
Current partition replica assignment
{"version":1,
"partitions":[....]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[.....]
}
说明:
Current partition replica assignment表示当前的消息存储状况。
Proposed partition reassignment configuration表示迁移后的消息存储状况。
将迁移后的json存入一个文件reassignment.json,供--execute命令使用
--generate: 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移,而是将消息迁移计划计算出来,供execute命令使用。
--execute: 根据给予的消息迁移计划进行迁移。
--verify: 检查消息是否已经迁移完成
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# cat reassignment.json
{"version":1,"partitions":[{"topic":"reqinfo","partition":1,"replicas":[0]},{"topic":"reqinfo","partition":0,"replicas":[0]}]}
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# ./bin/kafka-reassign-partitions.sh --reassignment-json-file reassignment.json --execute --zookeeper 10.16.58.131:2181
Current partition replica assignment
{"version":1,"partitions":[{"topic":"reqinfo","partition":0,"replicas":[1]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# ./bin/kafka-reassign-partitions.sh --reassignment-json-file reassignment.json --verify --zookeeper 10.16.58.131:2181
Status of partition reassignment:
Reassignment of partition [reqinfo,1] completed successfully 成功
Reassignment of partition [reqinfo,0] is still in progress 还在迁移中
[@s1538.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# ./bin/kafka-reassign-partitions.sh --reassignment-json-file reassignment.json --verify --zookeeper 10.16.58.131:2181
Status of partition reassignment:
Reassignment of partition [reqinfo,1] completed successfully
Reassignment of partition [reqinfo,0] completed successfully 结束
#效果
没有迁移之前的效果
[@s1540.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# sh bin/kafka-topics.sh --describe --topic reqinfo --zookeeper 10.16.58.131:2181Topic:reqinfo PartitionCount:2 ReplicationFactor:1 Configs:
Topic: reqinfo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: reqinfo Partition: 1 Leader: 2 Replicas: 2 Isr: 2
迁移之后
[@s1540.bx.sys.d /opt/apps/kafka_2.11-0.10.2.1]# sh bin/kafka-topics.sh --describe --topic reqinfo --zookeeper 10.16.58.131:2181Topic:reqinfo PartitionCount:2 ReplicationFactor:1 Configs:
Topic: reqinfo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: reqinfo Partition: 1 Leader: 0 Replicas: 0 Isr: 0
注意:
迁移之后的结果分区保持不变,leader和replicas都发生变化,replicas根据指定的broker数来决定