Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1115267
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2016-08-20 11:44:25

topolopy创建包含两部分 spout和bolt
1、spout
    消息源spout是Storm里面一个topology里面的消息生产者。简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次
Spout主要继承自BaseRichSpout提供了如下接口:
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
//主要用来进行初始化,如和mq建立连接,
void close();
//关闭时使用,断开mq连接
 void nextTuple();
//发送tuple到下一个bolt节点
void ack(Object msgId);
//消息被成功处理后会调用
void fail(Object msgId);
//消息处理失败时会调用,对于可靠性消息,需要重新放入消息队列进行处理
void declareOutputFields(OutputFieldsDeclarer declarer)
//消息源可以是多条消息源,可以用此函数将消息组成一条消息,进行发送,这里相当于只声明要发送的字段信息,发送值的个数是必须和声明字段个数一致,否则发送数据时会报错
2、blot
     Topology中所有的处理都由Bolt完成。即所有的消息处理逻辑被封装在bolts里面。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。
     Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。即需要经过很多blots。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)
 Blot需要实现IRichBolt接口,该接口主要提供了如下方法
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
//进行初始化操作,如连接redis,数据库等
void execute(Tuple input);
//对数据进行分析处理计算,然后发送到下一bolt进行处理
void cleanup();
//关闭时使用,如断开redis连接
void declareOutputFields(OutputFieldsDeclarer declarer);
//同spout 声明要发送数据的字段信息


3、创建topology 
TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("recvmsg", new RecvMsgSpout(), 1);
    builder.setBolt("parse", new ParseBolt(), 2).shuffleGrouping("recvmsg");
    builder.setBolt("count", new CountBolt(), 2).shuffleGrouping("parse");
    builder.setBolt("report", new ReportBolt(), 2).globalGrouping("count");
    Config conf = new Config();
    conf.setDebug(true);
     if (args != null && args.length > 0) {
      //分布式运行模式
   conf.setNumWorkers(4);
        StormSubmitter.submitTopologyWithProgressBar(args[0],
 conf,   builder.createTopology());
    } else {
        //本地运行模式 
        conf.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("recvmsg", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }

4、topology运行
storm的运行有两种模式: 本地模式和分布式模式.
1) 本地模式:
storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。
2) 分布式模式:
storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把认为重新分配到其它节点。


注意:
1、 由于分布式运行需要提交topology到storm,所以,topology依赖的jar包都要打进去但不能把依赖的storm jar包打进去,否则在运行时和storm 集群的jar冲突。 如果 使用了mvn管理项目可以 在pom.xml中配置打包时不包含storm jar。
    
       org.apache.storm
       storm-core
      0.9.3
      provided
    


2、 提交topology
需要到storm的主节点提交
Storm jar /xxx/xxx/stormlib/xxxx-analyse-1.0-jar-with-dependencies.jar com.xxxx.report.xxxx.topology.xxxxTopology  xxxxTopology

阅读(948) | 评论(0) | 转发(0) |
0

上一篇:SSH免密钥登录

下一篇:nagios介绍

给主人留下些什么吧!~~