Chinaunix首页 | 论坛 | 博客
  • 博客访问: 161393
  • 博文数量: 51
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 471
  • 用 户 组: 普通用户
  • 注册时间: 2015-05-11 10:24
文章分类

全部博文(51)

文章存档

2018年(3)

2017年(22)

2016年(9)

2015年(17)

我的朋友

分类: 大数据

2017-11-09 10:08:39

一,架构
Plumber是一个分布式数据采集系统,可以将分布在多台机器上的数据汇聚到Kafka,再进一步落地到HDFS中
  • Plumber采用Master/Slave的架构, 仅提供任务的监控使用,不提供配置数据修改等管理功能。
    • Plumber Agent作为Slave,分为Source和Sink两部分。Source负责将分布在不同服务器上的数据汇聚到Kafka,Sink负责将Kafka中的数据写入HDFS
    • Plumber Manager作为master,负责收集各Agent的任务信息,监控Agent状态,并提供告警
  • Plumber Agent在启动/停止的时候向Manager进行注册/注销来上报自己的任务信息以及状态信息
  • Plumber Agent在运行过程中,维护采集状态,并作为心跳数据,定期发送到Kafak中。
  • Plumber Manager接收Agent的注册、心跳数据,并根据这些数据来掌握各Agent的任务分配以及执行情况,最终记录到时间序列数据库(influx)中。
  • Plumber Manager通过Restfule API来对外提供接口,Plumber可以提供Web UI以及一些管理工具
  • Plumber Manager允许后续的数据处理模块通过Restful API对数据处理情况进行上报,与采集情况进行对比。

Plumber的设计可以与Flume进行类比。

  • Plumber实际上就是只有一级传输的Flume
    • 固定使用Kafka Channel作为Channel
    • 可以使用Flume HDFS sink作为Sink
    • Source可以按需选择
  • 扩展了Flume的Monitor服务,并定义了Plumber的Counter。将Flume组件应用进来时,需要进行改造,以维护Counter并通过Monitor进行上报

Plumber的设计和开发思路是基于Flume的,但是实际上只要可以执行注册/注销,并按照格式上报心跳,任何组件都可以作为Plumber的Souce/Sink使用。

目前Plumber使用Flume作为Source,使用Kafka2HDFS作为Sink。Source作为Plumber Agent Source的一个实现例子。
二,监控与心跳
Plumber的数据采集监控主要目标:

  • 能检测到各Agent目前是否存活
  • 能检测到各Agent的数据处理压力,即采集的速度是否跟得上数据生产的速度
  • 能检测到各Agent是否遇到文件无法处理的情况(格式不正确等原因不能完全读取)
  • 能对Source、Sink的采集数据量进行精确到Record级别的准确性对比

监控设计思路

  • 监控数据由Agent来维护,并作为心跳数据定期上报到Kafka
  • Manager消费Kafka中的心跳信息,处理并使用时间序列数据库进行存储

准确性数据

目前考虑的准确性主要是分时段对比,每个小时一条汇总数据

  • source和sink分别维护一个分时段的counter
  • monitor对counter进行格式化后,放在心跳消息中进行上报
  • 需要考虑进程重启的情况,尽量使重启不影响counter的准确性

Metrics数据

  • metrics记录从进程开始到当前时间点,Agent一共处理了多少数据,同样放到心跳信息里
  • metrics定性即可,主要用于了解Agent采集压力

心跳

心跳数据通过Kafka进行收集,这样做有以下几个好处:

  1. 隔离Manager和Agent,避免因为Manager升级/故障而丢失数据
  2. 避免Agent过多Manager收集不过来的情况
  3. 可以监控Agent到Kafka的链路是否畅通

上报方式

每一次心跳消息中,Agent上报当前节点的采集状态(每个文件采集了多少record,多少byte等)

优点
  • 每一次心跳都是一个独立的状态,部分心跳数据丢失
  • Agent重启可以不影响数据的准确性
  • Master无需维护状态信息
缺点
  • 客户端实现复杂度上升,需要缓存状态数据
  • 需要设计好上报过滤规则,过期的状态不再上报

心跳数据结构

心跳使用Kafka KeyedMessage发送到Kafka。

key的数据结构

key的数据要保证同一个agent被发送到Kafka的同一个topic的同一个partition里面去。Key使用ip:port的格式,example:
127.0.0.1:10086

value的数据结构

未维护或者不适用的字段上报-1

考虑序列化压力不大, value采用Json格式,便于直接消费检查。

{
    "timestamp" : 1470123010 , //时间戳,精度到毫秒
    "type" : "source" , //类型,source/sink
    "data" : [ //心跳数据
        { 
            "topic" : "app-test2" , //处理的topic
            "recordCounter" : 1238432, //启动开始到现在处理的条数
            "items":[
                {
                    "timeMap" : 1470123000,  //时间段,通常截取到了小时,精确到毫秒
                    "fileNum" : 5 , //文件数量, 如果不适用,此字段可以上报-1
                    "fileSize" : 65535,  //文件实际大小, 如果不适用,此字段可以上报-1
                    "bytes" : 6423,  //已经处理的字节, 如果不适用,此字段可以上报-1
                    "records" : 230 //已经处理的record数量
                },
                {
                    "timeMap" : 1470123000, //时间段,通常截取到了小时,精确到毫秒
                    "fileNum" : 5 , //文件数量, 如果不适用,此字段可以上报-1
                    "fileSize" : 65535,  //文件实际大小, 如果不适用,此字段可以上报-1
                    "bytes" : 6423,  //已经处理的字节, 如果不适用,此字段可以上报-1
                    "records" : 230 //已经处理的record数量
                }
            ]
        } //第一条数据
    ] 
}
topic

心跳默认使用的Kafka topic名称为 plumber


Manager 设计思路

Plumber Manager考虑对内作为Plumber的数据处理中心,通过Restful API对外相应查询请求。一些需要注意的地方:

  • 通常每个Agent每分钟发送一次心跳。
  • Manager会将心跳拆开,心跳中每个Agent-Topic-Time的数据作为一条数据记录存储到时间序列数据库中。因此实际产生的数据条数可能比心跳数据多很多
  • Manager尽可能的将数据缓存在内存中,定期刷新到时间序列数据库中,以减少数据处理压力和响应时延
  • Plumber不与业务产生联系。如果需要对具体业务的数据采集情况进行监控,可以利用Plumber的API,另行设计。这样的好处是保持Plumber的封装性和通用性,设计简单。

  • Manager从Kafka读取Agent的心跳数据

  • Manager提供API接收Agent的注册消息

  • Manager提供API接收第三方处理程序的上报信息

  • Manager通过InfluxDB存储数据

  • Manager通过API来对外提供信息

阅读(1711) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~