Chinaunix首页 | 论坛 | 博客
  • 博客访问: 435389
  • 博文数量: 138
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 716
  • 用 户 组: 普通用户
  • 注册时间: 2015-03-03 21:48
文章分类

全部博文(138)

文章存档

2019年(1)

2017年(5)

2016年(99)

2015年(33)

我的朋友

分类: LINUX

2016-03-31 14:25:28

前言:

       转载至http://www.sxt.cn/u/756/blog/4644

        昨天有朋友聊天说,我写的前面三篇太简单了,没有太多深入的东西。好吧,这说明我的目的达到了。我写这个系列的原因就是为了面向应用,进一步细化为两点:

        1. 以例子说话,由简入深,一步步了解如何在Storm上开发应用,不会读起来吃力;

        2. 对于一些原理性的东西,不去过于深究,只要记住Storm是这样实现的,开发的时候加以利用或规避。

在明白了这些基础的东西以后,如果对于原理性的东西Storm是如何实现的感兴趣,可以再去看源代码也不迟。毕竟这部分对开发应用的帮助并不直接。我认为,不必每个用Storm的人都必须了解Storm底层是如何实现的,当然,我会尝试在适当的位置插入相关原理性解释的链接,有兴趣可以直接去看看。就此原因,我把标题改成“Storm应用系列”。

 



Component

Storm中,Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口
 

全家普如下:
 

绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的,在以后的文章会具体讲解。

BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。 



Spout

在前面基本例子中,我们实现了一个RandomSpout,来看看其类图

  • Spout的最顶层抽象是ISpout接口。

open方法是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。

close方法在该spout关闭前执行,但是并不能得到保证其一定被执行。spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。

activatedeactivate :一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。


nextTuple 用来发射数据。

ack(Object) 

传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。

fail(Object)

同ack,只不过是tuple处理失败时执行。


我们的RandomSpout 由于继承了BaseRichSpout,所以不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。


结论:

通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。



Bolt

ExclaimBasicBolt的类图: 
 
这里可以看到一个奇怪的问题:
为什么IBasicBolt并没有继承IBolt?
我们带着问题往下看。

IBolt定义了三个方法:
 
  • IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文
  • execute接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果
  • cleanup 同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
红色部分是Bolt实现时一定要注意的地方。而Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。
如果你确实要反馈失败,可以抛出FailedException。

我们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ExclaimRichBolt extends BaseRichBolt {
 
    private OutputCollector collector;
     
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }
 
    @Override
    public void execute(Tuple tuple) {
        this.collector.emit(tuple, new Values(tuple.getString(0)+"!"));
        this.collector.ack(tuple);
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("after_excl"));
    }
 
}
修改topology
1
2
//builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout");
运行下,结果一致。

结论:
通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);
阅读(928) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~