一,架构
Plumber是一个分布式数据采集系统,可以将分布在多台机器上的数据汇聚到Kafka,再进一步落地到HDFS中
-
Plumber采用Master/Slave的架构, 仅提供任务的监控使用,不提供配置数据修改等管理功能。
-
Plumber Agent作为Slave,分为Source和Sink两部分。Source负责将分布在不同服务器上的数据汇聚到Kafka,Sink负责将Kafka中的数据写入HDFS
-
Plumber Manager作为master,负责收集各Agent的任务信息,监控Agent状态,并提供告警
-
Plumber Agent在启动/停止的时候向Manager进行注册/注销来上报自己的任务信息以及状态信息
-
Plumber Agent在运行过程中,维护采集状态,并作为心跳数据,定期发送到Kafak中。
-
Plumber Manager接收Agent的注册、心跳数据,并根据这些数据来掌握各Agent的任务分配以及执行情况,最终记录到时间序列数据库(influx)中。
-
Plumber Manager通过Restfule API来对外提供接口,Plumber可以提供Web UI以及一些管理工具
-
Plumber Manager允许后续的数据处理模块通过Restful API对数据处理情况进行上报,与采集情况进行对比。
Plumber的设计可以与Flume进行类比。
-
Plumber实际上就是只有一级传输的Flume
-
固定使用Kafka Channel作为Channel
-
可以使用Flume HDFS sink作为Sink
-
Source可以按需选择
-
扩展了Flume的Monitor服务,并定义了Plumber的Counter。将Flume组件应用进来时,需要进行改造,以维护Counter并通过Monitor进行上报
Plumber的设计和开发思路是基于Flume的,但是实际上只要可以执行注册/注销,并按照格式上报心跳,任何组件都可以作为Plumber的Souce/Sink使用。
目前Plumber使用Flume作为Source,使用Kafka2HDFS作为Sink。Source作为Plumber Agent Source的一个实现例子。
二,监控与心跳
Plumber的数据采集监控主要目标:
-
能检测到各Agent目前是否存活
-
能检测到各Agent的数据处理压力,即采集的速度是否跟得上数据生产的速度
-
能检测到各Agent是否遇到文件无法处理的情况(格式不正确等原因不能完全读取)
-
能对Source、Sink的采集数据量进行精确到Record级别的准确性对比
监控设计思路
-
监控数据由Agent来维护,并作为心跳数据定期上报到Kafka
-
Manager消费Kafka中的心跳信息,处理并使用时间序列数据库进行存储
准确性数据
目前考虑的准确性主要是分时段对比,每个小时一条汇总数据
-
source和sink分别维护一个分时段的counter
-
monitor对counter进行格式化后,放在心跳消息中进行上报
-
需要考虑进程重启的情况,尽量使重启不影响counter的准确性
Metrics数据
-
metrics记录从进程开始到当前时间点,Agent一共处理了多少数据,同样放到心跳信息里
-
metrics定性即可,主要用于了解Agent采集压力
心跳
心跳数据通过Kafka进行收集,这样做有以下几个好处:
-
隔离Manager和Agent,避免因为Manager升级/故障而丢失数据
-
避免Agent过多Manager收集不过来的情况
-
可以监控Agent到Kafka的链路是否畅通
上报方式
每一次心跳消息中,Agent上报当前节点的采集状态(每个文件采集了多少record,多少byte等)
优点
-
每一次心跳都是一个独立的状态,部分心跳数据丢失
-
Agent重启可以不影响数据的准确性
-
Master无需维护状态信息
缺点
-
客户端实现复杂度上升,需要缓存状态数据
-
需要设计好上报过滤规则,过期的状态不再上报
心跳数据结构
心跳使用Kafka KeyedMessage发送到Kafka。
key的数据结构
key的数据要保证同一个agent被发送到Kafka的同一个topic的同一个partition里面去。Key使用ip:port的格式,example:
127.0.0.1:10086
value的数据结构 未维护或者不适用的字段上报-1
考虑序列化压力不大, value采用Json格式,便于直接消费检查。
{
"timestamp" : 1470123010 , //时间戳,精度到毫秒
"type" : "source" , //类型,source/sink
"data" : [ //心跳数据
{
"topic" : "app-test2" , //处理的topic
"recordCounter" : 1238432, //启动开始到现在处理的条数
"items":[
{
"timeMap" : 1470123000, //时间段,通常截取到了小时,精确到毫秒
"fileNum" : 5 , //文件数量, 如果不适用,此字段可以上报-1
"fileSize" : 65535, //文件实际大小, 如果不适用,此字段可以上报-1
"bytes" : 6423, //已经处理的字节, 如果不适用,此字段可以上报-1
"records" : 230 //已经处理的record数量
},
{
"timeMap" : 1470123000, //时间段,通常截取到了小时,精确到毫秒
"fileNum" : 5 , //文件数量, 如果不适用,此字段可以上报-1
"fileSize" : 65535, //文件实际大小, 如果不适用,此字段可以上报-1
"bytes" : 6423, //已经处理的字节, 如果不适用,此字段可以上报-1
"records" : 230 //已经处理的record数量
}
]
} //第一条数据
]
}
topic
心跳默认使用的Kafka topic名称为 plumber
Manager 设计思路
Plumber Manager考虑对内作为Plumber的数据处理中心,通过Restful API对外相应查询请求。一些需要注意的地方:
-
通常每个Agent每分钟发送一次心跳。
-
Manager会将心跳拆开,心跳中每个Agent-Topic-Time的数据作为一条数据记录存储到时间序列数据库中。因此实际产生的数据条数可能比心跳数据多很多
-
Manager尽可能的将数据缓存在内存中,定期刷新到时间序列数据库中,以减少数据处理压力和响应时延
-
Plumber不与业务产生联系。如果需要对具体业务的数据采集情况进行监控,可以利用Plumber的API,另行设计。这样的好处是保持Plumber的封装性和通用性,设计简单。
-
Manager从Kafka读取Agent的心跳数据
-
Manager提供API接收Agent的注册消息
-
Manager提供API接收第三方处理程序的上报信息
-
Manager通过InfluxDB存储数据
-
Manager通过API来对外提供信息
阅读(1837) | 评论(0) | 转发(0) |