从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。
分类: 高性能计算
2014-08-07 10:08:05
本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。
Topology由Spout,Bolt组成,其逻辑关系大体如下图所示。
无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。
worker,executor, task三者之间的关系可以用下图表示
小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.
每一个executor使用的是actor pattern,high level的处理逻辑如下图所示
在源码走读之四一文中总结了worker进程内的各种类型的thread,也即executor,这个等同于定义了进程内部和进程间的接口类型。那么这些接口上的消息在具体流传和处理过程中需要定义哪些数据结构,针对这些数据结构,又要做哪些必要的处理呢?
换句话说,就是为什么在worker.clj中有哪些数据和函数存在,不这样做,可以不?
先图示一下,外部消息处理的大概流程。
注:圈起来的数字表示消息转换和处理的序列。
监听端口准备就绪,接收线程在收到外部的消息后,面临的问题就是如何确定由哪个task来处理该消息。接收到的tuple中含有task-id,根 据task-id可以知道运行该task的executor,executor中有receive-message-queue即(incoming queue)来存放外部的tuple. 定义的数据结构需要反映这个转换过程task-id->executor->receive-queue-map.
那么在worker-data中哪些数据项与这个过程相关呢
transfer-local-fn将数据从接收线程发送到spout或bolt所在的executor线程。
接下来数据会被传递到executor,于是又牵涉到executor的数据结构问题。executor-data由函数mk-executor-data创建,其内容与worker-data比较起来相对较少。
executor收到tuple之后,第一步需要进行反序列化,storm中使用kyro来进行序列化和反序列化,这也是为什么在executor中有该数据项的原因。
executor中与步骤2相关的数据项
步骤2处理结束,会产生相应的tuple发送到外部。这个过程需要多解释一下,首先tuple不是直接发送给worker的transfer- thread(负责向其它进程发送消息),而是发送给send-handler线程,每一个executor在创建的时候最起码会有两个线程被创建,一个 用于运行bolt或spout的处理逻辑,另一个用以负责缓存bolt或spout产生的对外发送的tuple。
一旦snd-hander中的tuple数量达到阀值,这些被缓存的tuple会一次性发送给worker级别的transfer-thread.
executor中与步骤3相关的数据项
在步骤3中生成outgoing的tuple,tuple生成的时候需要回答两个基本问题
处理逻辑很简单,先将数据缓存,然后在达到阀值之后,一起传送给transfer-thread.
start-batch-transfer->worker-handler
(defn start-batch-transfer->worker-handler! [worker executor-data] (let [worker-transfer-fn (:transfer-fn worker) cached-emit (MutableObject. (ArrayList.)) storm-conf (:storm-conf executor-data) serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) ] (disruptor/consume-loop* (:batch-transfer-queue executor-data) (disruptor/handler [o seq-id batch-end?] (let [^ArrayList alist (.getObject cached-emit)] (.add alist o) (when batch-end? (worker-transfer-fn serializer alist) (.setObject cached-emit (ArrayList.)) ))) :kill-fn (:report-error-and-die executor-data))))
worker-transfer-fn是worker中的transfer-fn,由mk-transfer-fn生成。
(defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] (fast-list-iter [[task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) (local-transfer local) ;; not using map because the lazy seq shows up in perf profiles (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])] (disruptor/publish transfer-queue serialized-pairs) )))))
处理函数mk-transfer-tuples-handler,主要进行序列化,将序列化后的数据发送给目的地址。
(defn mk-transfer-tuples-handler [worker] (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (ArrayList.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] (disruptor/clojure-handler (fn [packets _ batch-end?] (.addAll drainer packets) (when batch-end? (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket task->node+port @task->node+port] ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead) ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering) (fast-list-iter [[task ser-tuple] drainer] ;; TODO: consider write a batch of tuples here to every target worker ;; group by node+port, do multipart send (let [node-port (get task->node+port task)] (when node-port (.send ^IConnection (get node+port->socket node-port) task ser-tuple)) )))) (.clear drainer))))))
tuple发送的时候需要用到connection,但目前只知道task-id,所以在worker中需要保存task-id到node+port的映射,node+port与outgoing connections之间的映射。
worker中与步骤5相关的数据项:
上述五个步骤并没有涵盖worker-data所有的数据项,那么其它的数据项归一归类,大体如下
timer相关,timer相关的数据项包括timer及其对应的处理句柄
zk相关
配置相关
Assignment相关
进程关闭相关
其它的其它
设计的时候,一定是先画出一个大概的蓝图,然后逐步的细化并加以实现。具体来说,步骤如下