分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。
分类: 项目管理
2019-08-22 18:03:32
最近要把原来做的那套集中式日志监控系统进行迁移,原来的实现方案是: Log Agent => Log Server => ElasticSearch => Kibana,其中Log Agent和Log Server之间走的是Thrift RPC,自己实现了一个简单的负载均衡(WRB)。
原来的方案其实运行的挺好的,异步化Agent对应用性能基本没有影响。支持我们这个每天几千万PV的应用一点压力都没有。不过有个缺点就是如果错误日志暴增,Log Server这块处理不过来,会导致消息丢失。当然我们量级没有达到这个程度,而且也是可以通过引入队列缓冲一下处理。不过现在综合考虑,其实直接使用消息队列会更简单。PRC,负载均衡,负载缓冲都内建实现了。另一种方式是直接读取日志,类似于logstash或者flume的方式。不过考虑到灵活性还是决定使用消息队列的方式,反正我们已经部署了Zookeeper。调研了一下,Kafka是最适合做这个数据中转和缓冲的。于是,打算把方案改成: Log Agent => Kafka => ElasticSearch => Kibana。
TIPS
如果跟ES对应,Broker相当于Node,Topic相当于Index,Message相对于Document,而Partition相当于shard。LogSegment相对于ES的Segment。
我们在使用kafka的过程中有时候可以需要查看我们生产的消息的各种信息,这些消息是存储在kafka的日志文件中的。由于日志文件的特殊格式,我们是无法直接查看日志文件中的信息内容。Kafka提供了一个命令,可以将二进制分段日志文件转储为字符类型的文件:
注意:这里 --print-data-log 是表示查看消息内容的,不加此项只能看到Header,看不到payload。
也可以用来查看index文件:
timeindex文件也是OK的:
消费者平衡(Consumer Rebalance)是指的是消费者重新加入消费组,并重新分配分区给消费者的过程。在以下情况下会引起消费者平衡操作:
消费者自动平衡操作提供了消费者的高可用和高可扩展性,这样当我们增加或者减少消费者或者分区数的时候,不需要关心底层消费者和分区的分配关系。但是需要注意的是,在rebalancing过程中,由于需要给消费者重新分配分区,所以会出现在一个短暂时间内消费者不能拉取消息的状况。
NOTES
这里要特别注意最后一种情况,就是所谓的慢消费者(Slow Consumers)。如果没有在session.timeout.ms时间内收到心跳请求,协调者可以将慢消费者从组中移除。通常,如果消息处理比session.timeout.ms慢,就会成为慢消费者。导致两次poll()方法的调用间隔比session.timeout.ms时间长。由于心跳只在 poll()调用时才会发送(在0.10.1.0版本中, 客户端心跳在后台异步发送了),这就会导致协调者标记慢消费者死亡。
如果没有在session.timeout.ms时间内收到心跳请求,协调者标记消费者死亡并且断开和它的连接。 同时,通过向组内其他消费者的HeartbeatResponse中发送IllegalGeneration错误代码 触发rebalance操作。
在手动commit offset的模式下,要特别注意这个问题,否则会出现commit不上的情况。导致一直在重复消费。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm或Spark Streaming这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高。因此,Kafka从0.8开始提供High Availability机制。主要表现在Data Replication和Leader Election两方面。
Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties 中配置:
default.replication.factor = 1
|
该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。 每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,follower批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。
需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。
引入Replication之后,同一个Partition可能会有多个副本(Replica),而这时需要在这些副本之间选出一个Leader,Producer和Consumer只与这个Leader副本交互,其它Replica作为Follower从Leader中复制数据。注意,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),并不提供任何读写服务,系统更加简单且高效。
思考 为什么follower副本不提供读写,只做冷备?
follwer副本不提供写服务这个比较好理解,因为如果follower也提供写服务的话,那么就需要在所有的副本之间相互同步。n个副本就需要 nxn 条通路来同步数据,如果采用异步同步的话,数据的一致性和有序性是很难保证的;而采用同步方式进行数据同步的话,那么写入延迟其实是放大n倍的,反而适得其反。
那么为什么不让follower副本提供读服务,减少leader副本的读压力呢?这个除了因为同步延迟带来的数据不一致之外,不同于其他的存储服务(如ES,MySQL),Kafka的读取本质上是一个有序的消息消费,消费进度是依赖于一个叫做offset的偏移量,这个偏移量是要保存起来的。如果多个副本进行读负载均衡,那么这个偏移量就不好确定了。
TIPS
Kafka的leader副本类似于ES的primary shard,follower副本相对于ES的replica。ES也是一个index有多个shard(相对于Kafka一个topic有多个partition),shard又分为primary shard和replicition shard,其中primary shard用于提供读写服务(sharding方式跟MySQL非常类似:shard = hash(routing) % number_of_primary_shards。但是ES引入了协调节点(coordinating node) 的角色,实现对客户端透明。),而replication shard只提供读服务(这里跟Kafka一样,ES会等待relication shard返回成功才最终返回给client)。
有传统MySQL分库分表经验的同学一定会觉得这个过程是非常相似的,就是一个 sharding + replication 的数据架构,只是通过client(SDK)或者coordinator对你透明了而已。
Propagate消息
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了 ISR (in-sync replicas) 中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加 HW( High-Watermark) 并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
Kafka Replication的数据流如下图所示:
关于这方面的内容比较多而且复杂,这里就不展开了,这篇文章写的很好,有兴趣的同学可以学习 Kafka设计解析(二):Kafka High Availability (上)。
下面这张图非常简单明了的显示kafka的所有游标(https://rongxinblog.wordpress.com/2016/07/29/kafka-high-watermark/):
下面简单的说明一下:
0、ISRIn-Sync Replicas list,顾名思义,就是跟leader “保存同步” 的Replicas。“保持同步”的含义有些复杂,在0.9版本,broker的参数replica.lag.time.max.ms用来指定ISR的定义,如果leader在这么长时间没收到follower的拉取请求,或者在这么长时间内,follower没有fetch到leader的log end offset,就会被leader从ISR中移除。ISR是个很重要的指标,controller选取partition的leader replica时会使用它,leader需要维护ISR列表,因此leader选取ISR后会把结果记到Zookeeper上。
在需要选举leader的场景下,leader和ISR是由controller决定的。在选出leader以后,ISR是leader决定。如果谁是leader和ISR只存在于ZK上,那么每个broker都需要在Zookeeper上监听它host的每个partition的leader和ISR的变化,这样效率比较低。如果不放在Zookeeper上,那么当controller fail以后,需要从所有broker上重新获得这些信息,考虑到这个过程中可能出现的问题,也不靠谱。所以leader和ISR的信息存在于Zookeeper上,但是在变更leader时,controller会先在Zookeeper上做出变更,然后再发送LeaderAndIsrRequest给相关的broker。这样可以在一个LeaderAndIsrRequest里包括这个broker上有变动的所有partition,即batch一批变更新信息给broker,更有效率。另外,在leader变更ISR时,会先在Zookeeper上做出变更,然后再修改本地内存中的ISR。
1、Last Commited OffsetConsumer最后提交的位置,这个位置会保存在一个特殊的topic:_consumer_offsets 中。
2、Current PositionConsumer当前读取的位置,但是还没有提交给broker。提交之后就变成Last Commit Offset。
3、High Watermark(HW)这个offset是所有ISR的LEO的最小位置(minimum LEO across all the ISR of this partition),consumer不能读取超过HW的消息,因为这意味着读取到未完全同步(因此没有完全备份)的消息。换句话说就是:HW是所有ISR中的节点都已经复制完的消息.也是消费者所能获取到的消息的最大offset(注意,并不是所有replica都一定有这些消息,而只是ISR里的那些才肯定会有)。
随着follower的拉取进度的即时变化,HW是随时在变化的。follower总是向leader请求自己已有messages的下一个offset开始的数据,因此当follower发出了一个fetch request,要求offset为A以上的数据,leader就知道了这个follower的log end offset至少为A。此时就可以统计下ISR里的所有replica的LEO是否已经大于了HW,如果是的话,就提高它。同时,leader在fetch本地消息给follower时,也会在返回给follower的reponse里附带自己的HW。这样follower也就知道了leader处的HW(但是在实现中,follower获取的只是读leader本地log时的HW,并不能保证是最新的HW)。但是leader和follower的HW是不同步的,follower处记的HW可能会落后于leader。
Hight Watermark Checkpoint
由于HW是随时变化的,如果即时更新到Zookeeper,会带来效率的问题。而HW是如此重要,因此需要持久化,ReplicaManager就启动了单独的线程定期把所有的partition的HW的值记到文件中,即做highwatermark-checkpoint。
4、Log End Offset(LEO)这个很好理解,就是当前的最新日志写入(或者同步)位置。
Kafka支持JVM语言(java、scala),同是也提供了高性能的C/C++客户端,和基于librdkafka封装的各种语言客户端。如,Python客户端: confluent-kafka-python 。Python客户端还有纯python实现的:kafka-python。
下面是Python例子(以confluent-kafka-python为例):
Producer:
Consumer:
跟普通的消息队列使用基本是一样的。
kafka读取消息其实是基于offset来进行的,如果offset出错,就可能出现重复读取消息或者跳过未读消息。在0.8.2之前,kafka是将offset保存在ZooKeeper中,但是我们知道zk的写操作是很昂贵的,而且不能线性拓展,频繁的写入zk会导致性能瓶颈。所以在0.8.2引入了Offset Management,将这个offset保存在一个 compacted kafka topic(_consumer_offsets),Consumer通过发送OffsetCommitRequest请求到指定broker(偏移量管理者)提交偏移量。这个请求中包含一系列分区以及在这些分区中的消费位置(偏移量)。偏移量管理者会追加键值(key-value)形式的消息到一个指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition组成的,而value是偏移量。同时为了提供xing能,内存中也会维护一份最近的记录,这样在指定key的情况下能快速的给出OffsetFetchRequests而不用扫描全部偏移量topic日志。如果偏移量管理者因某种原因失败,新的broker将会成为偏移量管理者并且通过扫描偏移量topic来重新生成偏移量缓存。
0.9版本之前的Kafka提供了kafka-consumer-offset-checker.sh脚本,可以用来查看某个消费组对一个或者多个topic的消费者消费偏移量情况,该脚本调用的是kafka.tools.Consumer.OffsetChecker。0.9版本之后已不再建议使用该脚本了,而是建议使用kafka-consumer-groups.sh脚本,该脚本调用的是kafka.admin.ConsumerGroupCommand。这个脚本其实是对消费组进行管理,不只是查看消费组的偏移量。这里只介绍最新的kafka-consumer-groups.sh脚本使用。
用ConsumerGroupCommand工具,我们可以使用list,describe,或delete消费者组。
例如,要列出所有主题中的所有消费组信息,使用list参数:
要查看某个消费组当前的消费偏移量则使用describe参数:
NOTES
该脚本只支持删除不包括任何消费组的消费组,而且只能删除消费组为老版本消费者对应的消费组(即分组元数据存储在zookeeper的才有效),因为这个脚本删除操作的本质就是删除ZK中对应消费组的节点及其子节点而已。
上面介绍了通过脚本工具方式查询Kafka消费偏移量。事实上,我们也可以通过API的方式查询消费偏移量。
Kafka消费者API提供了两个方法用于查询消费者消费偏移量的操作:
除了查看消费偏移量,有些时候我们需要人为的指定offset,比如跳过某些消息,或者redo某些消息。在0.8.2之前,offset是存放在ZK中,只要用ZKCli操作ZK就可以了。但是在0.8.2之后,offset默认是存放在kafka的__consumer_offsets队列中,只能通过API修改了:
Class KafkaConsumer Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(TopicPartition…) and seekToEnd(TopicPartition…) respectively).
参考文档: Kafka Consumer Offset Management
Kafka消费者API提供了重置消费偏移量的方法: