topolopy创建包含两部分 spout和bolt
1、spout
消息源spout是Storm里面一个topology里面的消息生产者。简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次
Spout主要继承自BaseRichSpout提供了如下接口:
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
//主要用来进行初始化,如和mq建立连接,
void close();
//关闭时使用,断开mq连接
void nextTuple();
//发送tuple到下一个bolt节点
void ack(Object msgId);
//消息被成功处理后会调用
void fail(Object msgId);
//消息处理失败时会调用,对于可靠性消息,需要重新放入消息队列进行处理
void declareOutputFields(OutputFieldsDeclarer declarer)
//消息源可以是多条消息源,可以用此函数将消息组成一条消息,进行发送,这里相当于只声明要发送的字段信息,发送值的个数是必须和声明字段个数一致,否则发送数据时会报错
2、blot
Topology中所有的处理都由Bolt完成。即所有的消息处理逻辑被封装在bolts里面。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。
Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。即需要经过很多blots。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)
Blot需要实现IRichBolt接口,该接口主要提供了如下方法
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
//进行初始化操作,如连接redis,数据库等
void execute(Tuple input);
//对数据进行分析处理计算,然后发送到下一bolt进行处理
void cleanup();
//关闭时使用,如断开redis连接
void declareOutputFields(OutputFieldsDeclarer declarer);
//同spout 声明要发送数据的字段信息
3、创建topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("recvmsg", new RecvMsgSpout(), 1);
builder.setBolt("parse", new ParseBolt(), 2).shuffleGrouping("recvmsg");
builder.setBolt("count", new CountBolt(), 2).shuffleGrouping("parse");
builder.setBolt("report", new ReportBolt(), 2).globalGrouping("count");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
//分布式运行模式
conf.setNumWorkers(4);
StormSubmitter.submitTopologyWithProgressBar(args[0],
conf, builder.createTopology());
} else {
//本地运行模式
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("recvmsg", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
4、topology运行
storm的运行有两种模式: 本地模式和分布式模式.
1) 本地模式:
storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。
2) 分布式模式:
storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把认为重新分配到其它节点。
注意:
1、 由于分布式运行需要提交topology到storm,所以,topology依赖的jar包都要打进去但不能把依赖的storm jar包打进去,否则在运行时和storm 集群的jar冲突。 如果 使用了mvn管理项目可以 在pom.xml中配置打包时不包含storm jar。
org.apache.storm
storm-core
0.9.3
provided
2、 提交topology
需要到storm的主节点提交
Storm jar /xxx/xxx/stormlib/xxxx-analyse-1.0-jar-with-dependencies.jar com.xxxx.report.xxxx.topology.xxxxTopology xxxxTopology
阅读(929) | 评论(0) | 转发(0) |