Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3277229
  • 博文数量: 346
  • 博客积分: 10189
  • 博客等级: 上将
  • 技术积分: 3125
  • 用 户 组: 普通用户
  • 注册时间: 2008-08-05 19:46
文章分类

全部博文(346)

文章存档

2013年(35)

2011年(35)

2010年(76)

2009年(48)

2008年(152)

分类: 大数据

2013-08-28 11:22:46

转自:http://blog.csdn.net/willidie/article/details/10432145

目录

背景知识

序列化中间结果

尝试合并中间结果

如何使用自定义配置文件?

程序部署中的注意事项

如何使用日志系统?如何debug


背景知识

首先我们需要了解这些背景知识:

Basic:

II 

III  

IV 

VI 

VII 

Advanced:

II 

III TimeCacheMap实现分析

IV 使用storm实现实时热门话题计算


阅读了以上文章后,storm-starter项目是个不错的开始,下载这个项目的源码,学习它的代码。里面包含了storm各种典型应用demo。

有一些基本的概念,官方wiki没有专门说明,但是比较重要。这里首先介绍下,以便后文引用。

一个“task”指的是负责运行spout和bolt代码逻辑的单独一个线程;

一个“worker”指的是负责向多个task投递消息的网络监听程序(独立的进程),每“worker”代理的“task”由其自身在初始化时候创建;

一个“topology”在storm集群中会创建多个“worker”(分布在多个节点上)帮助其完成其定义的计算逻辑;


序列化中间结果

在storm中,任何由spout和bolt发射出去的元组都是动态类型,即可以是任何自定义类型。Storm将元组视为动态类型,而非类似hadoop的静态类型,有深层考虑,具体请参看。

发射序列化后的自定义类型对象与发射ASCII字符串相比,有很多优势:

I 支持复杂类型的中间结果(比如含有容器类的中间结果);

II 不再需要在发射和接收元组时进行繁冗的字符串操作;

III 代码可读性大大增强;

storm内置使用kryo来序列化自定义类型的对象。它比jdk自身的序列化高效30倍以上,其带来的序列化开销非常地小。以这样微小的代价,换来以上三个好处,非常值得。


尝试合并中间结果

storm不能单独作为一个个体提供实时的计算服务。向前,它需要一个消息队列的支持,向后则需要一个key-value存储系统的支持(广义上说任何存储系统都可以理解为key-value模式,比如对关系数据库来说,key是主键,value是记录;对文件来说,key是文件名,value是文件内容等等)。此外,由于storm是增量计算,每一条从MQ中取出的消息都是独立的。它基本可以理解为由两部分组成,“身份”信息和值信息。

即:{ {身份} {值} }。

其中“身份”信息可映射到一组存储系统中需要更新的key,而值信息则是做具体更新操作时需要的数据样本。

其实,不仅是原始消息,storm任何bolt吐出的中间结果都由这两部分组成。

理解了以上内容,我们来看一个典型的storm程序需要做哪些事情:

1、 从消息队列中取数据

2、 对原始消息处理得到中间结果A1

对中间结果A1进行处理得到中间结果A2

       ……

对中间结果An-1进行处理得到An

3、 根据An的“身份”信息和值信息更新存储系统中的计算结果

从中我们可以看出程序读写存储系统的次数依赖于An的消息数量,而An的数量则依赖于An-1的数量,如果能够合并消息数量,后端bolt处理的压力就会极大的减小,这是一个“金字塔”型的收缩结构。


能够合并的中间并需具备相同的“身份”信息。然而,并不是所有的中间结果都是能合并的。

一个中间结果An是否能够合并,决定于下一个task在对An的处理过程中,无论An是否是合并过的结果,都不影响此task处理An过程的正确性。

在“荧光”项目的开发中,我们设计出了一种CombiningBolt可以对开发者透明地进行中间结果合并,只要开发者自己的中间结果类继承了如下的接口:




并且以类似如下的方式定义自己的topology(将CombiningBolt放在两个计算逻辑bolt之间,并且以中间结果的“身份”信息分组,其中CombiningBolt构造函数可以接受一个int型参数,指明合并多少个中间结果后发送合并后的结果):




下图显示的是“荧光”中使用CombiningBolt合并中间结果的效果,可以看到CombiningBolt与最后的StoreResultBolt ack的消息数量正好1:10。这将redis压力降低为未合并前的1/10


CombiningBolt代码如下所示:


[java] view plaincopy
  1. package com.tencent.admonitor.storm;  
  2.   
  3.   
  4. import java.io.Serializable;  
  5. import java.util.ArrayList;  
  6. import java.util.Collections;  
  7. import java.util.Iterator;  
  8. import java.util.List;  
  9. import java.util.Map;  
  10. import java.util.concurrent.ConcurrentLinkedQueue;  
  11.   
  12. import org.apache.log4j.Logger;  
  13.   
  14. import com.tencent.admonitor.Util.Combinable;  
  15.   
  16.   
  17.   
  18. import backtype.storm.task.TopologyContext;  
  19. import backtype.storm.topology.BasicOutputCollector;  
  20. import backtype.storm.topology.OutputFieldsDeclarer;  
  21. import backtype.storm.topology.base.BaseBasicBolt;  
  22. import backtype.storm.tuple.Fields;  
  23. import backtype.storm.tuple.Tuple;  
  24. import backtype.storm.tuple.Values;  
  25. import backtype.storm.utils.TimeCacheMap;  
  26. import backtype.storm.utils.TimeCacheMap.ExpiredCallback;  
  27.   
  28. /** 
  29.  * CombiningBolt is used for combining intermediate objects which implement the Combinable 
  30.  * interface. 
  31.  *  
  32.  * 
  33.  */  
  34.   
  35. public class CombiningBolt extends BaseBasicBolt{  
  36.     private static final Logger LOG = Logger.getLogger(CombiningBolt.class);  
  37.     private TimeCacheMap combiningMap ;  
  38.     private ConcurrentLinkedQueue expiredList;  
  39.     private static int DEFAULT_COMBINING_WINDOW = 10;  
  40.     private static int DEFAULT_NUM_KEYS = 10000;  
  41.     private static int DEFAULT_EXPIRED_SECONDS= 5;  
  42.     private static int NUM_BUCKETS = 30;  
  43.       
  44.     private int combiningWindow;  
  45.     private int numKeys;  
  46.     private int expiredSeconds;  
  47.     public CombiningBolt(){  
  48.         this(DEFAULT_COMBINING_WINDOW,DEFAULT_NUM_KEYS,DEFAULT_EXPIRED_SECONDS);  
  49.     }  
  50.       
  51.     public CombiningBolt(int CombiningWindow, int numKeys, int expiredSeconds){  
  52.         this.combiningWindow = CombiningWindow;  
  53.         this.numKeys = numKeys;  
  54.         this.expiredSeconds = expiredSeconds;  
  55.           
  56.     }  
  57.      @Override  
  58.     public void prepare(Map stormConf, TopologyContext context) {  
  59.          expiredList = new ConcurrentLinkedQueue();  
  60.          ExpiredCallback callBack = new CallBackMove();  
  61.          combiningMap = new TimeCacheMap(expiredSeconds,NUM_BUCKETS,callBack);  
  62.     }  
  63.        
  64.     @Override  
  65.     public void execute(Tuple input, BasicOutputCollector collector) {  
  66.         String ident = input.getString(0);  
  67.         Combinable combinable = (Combinable)input.getValue(1);  
  68.           
  69.         if(!combiningMap.containsKey(ident)){  
  70.             if(combiningMap.size() >= numKeys){  
  71.                 collector.emit(new Values(ident,combinable));  
  72.             }else{  
  73.                 combiningMap.put(ident, combinable);  
  74.             }  
  75.         }else{  
  76.             Combinable old = combiningMap.get(ident);  
  77.             Combinable new_combinable = old.combine(combinable);  
  78.             if(new_combinable.getCombinedCount() >= combiningWindow){  
  79.                 collector.emit(new Values(ident,new_combinable));  
  80.                 combiningMap.remove(ident);  
  81.             }else{  
  82.                 combiningMap.put(ident, new_combinable);  
  83.             }  
  84.         }  
  85.         int length = expiredList.size();  
  86.         while(length > 0){  
  87.             Combinable e = (Combinable)expiredList.poll();  
  88.             if(e != null)  
  89.                 collector.emit(new Values(e.ident(),e));  
  90.             length--;  
  91.         }  
  92.           
  93.     }  
  94.   
  95.     @Override  
  96.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  97.         declarer.declare(new Fields("ident","combinable"));  
  98.           
  99.     }  
  100.       
  101.     private class CallBackMove implements ExpiredCallback,Serializable{  
  102.       
  103.         private static final long serialVersionUID = 1L;  
  104.   
  105.         @Override  
  106.         public void expire(K key, V val) {  
  107.             expiredList.offer(val);  
  108.         }     
  109.     }  
  110.   
  111. }  



  112. 如何使用自定义配置文件?

    开发storm应用时可能需要用到自身的配置文件。storm.yaml只能提供给storm自身的配置系统使用。针对后台最广泛使用的ini配置文件,推荐使用ini4j。它是一个轻量的ini配置文件读取器、具有简单易懂的api。

    请在spout和bolt的prepare方法中读取配置文件。任何资源类的初始化都需要放到prepare方法中(比如数据库连接,和读取文件索引类等)。


    程序部署中的注意事项

    I 打包storm程序请不要把依赖打进去,依赖需要单独部署;

    II 你需要一种有效的机制将程序的依赖包分发到所有的storm节点上(可上传到nimbus,通过rsync做目录同步);

    III 你需要分发的依赖包集合=你的程序所有的依赖包集合 ?  storm依赖包与你的依赖包的交集(否则topology会初始化失败)

    IV 在所有supervisor节点上自定义配置文件的内容和路径需要完全一致,路径不一致topology会初始化失败,内容不一致,程序的行为则是未定义的;


    如何使用日志系统?如何debug?

    在经过一番辛勤劳动之后,你的程序编译成功了。但是,这不代表着它会按照你设定的行为去运行。如何debug成为了一个问题。

    最基本也是最原始的思路就是log。在所有你认为可能出错的地方log(尤其是在prepare方法中,这里通常做的是资源类的初始化,如果它们初始化失败,topology则会初始化失败,这样能帮助你快速定位到错误原因)。

    Storm默认与log4j集成。你可以再$STORM_HOME/log4j/目录下找log4j的配置文件。通过调整它来控制log4j的行为。

    不要急于在真实环境下部署你的程序。想要测试的话,还是首先在Local mode模式下观察吧。如果在local mode下测试的结果是符合预期的,而在真实环境下出错,一般应是非代码的因素造成的。你需要检查节点的资源文件、自定义配置文件、依赖包等等。此外在local mode下,所有日志打印信息是直接输出到屏幕上的,这样方便你看的更加清楚。


    下面介绍下storm日志的结构:

    所有的日志文件都存放在$STORM_HOME/logs/路径下。

    在nimbus节点上:nimbus.log记录的是nimbus启动过程中的输出信息,包括启动时间和各个worker和task初始化过程中打印信息等等。ui.log则记录的storm监控程序启动过程中的输出信息,包括启动时间等等。

    在supervisor节点上:supervisor.log记录的则是supervisor的相关启动信息。worker-XXX(一个supervisor节点通常部署了多个worker)记录的是消息传递、和任务执行过程中的输出信息(也就是你代码中的日志打印部分)。storm的设计目标之一是让任务(task)部署对用户透明。这样造成了:当你需要观察一个task的日志输出信息时,你不知道到哪个节点的哪个日志文件去找这个信息。因此,还是强烈建议在local mode下debug你的程序,然后在真实环境做好日志告警。

    阅读(14321) | 评论(0) | 转发(0) |
    给主人留下些什么吧!~~