Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1111774
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-07-09 19:01:15


点击(此处)折叠或打开

  1. import backtype.storm.spout.SpoutOutputCollector;
  2. import backtype.storm.task.TopologyContext;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseRichSpout;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Values;

  7. import java.io.BufferedReader;
  8. import java.io.FileNotFoundException;
  9. import java.io.FileReader;
  10. import java.util.Map;

  11. public class WordReader extends BaseRichSpout {

  12.     private SpoutOutputCollector collector;
  13.     private FileReader fileReader;
  14.     private boolean completed = false;
  15.     public void ack(Object msgId) {
  16.         System.out.println("OK:"+msgId);
  17.     }
  18.     public void close() {}
  19.     public void fail(Object msgId) {
  20.         System.out.println("FAIL:"+msgId);
  21.     }

  22.     /**
  23.      * The only thing that the methods will do It is emit each
  24.      * file line
  25.      */
  26.     public void nextTuple() {
  27.         /**
  28.          * The nextuple it is called forever, so if we have been readed the file
  29.          * we will wait and then return
  30.          */
  31.         if(completed){
  32.             try {
  33.                 Thread.sleep(1000);
  34.             } catch (InterruptedException e) {
  35.                 //Do nothing
  36.             }
  37.             return;
  38.         }
  39.         String str;
  40.         //Open the reader
  41.         BufferedReader reader = new BufferedReader(fileReader);
  42.         try{
  43.             //Read all lines
  44.             while((str = reader.readLine()) != null){
  45.                 /**
  46.                  * By each line emmit a new value with the line as a their
  47.                  */
  48.                  //System.out.println(str);
  49.                 this.collector.emit(new Values(str),str);
  50.             }
  51.         }catch(Exception e){
  52.             throw new RuntimeException("Error reading tuple",e);
  53.         }finally{
  54.             completed = true;
  55.         }
  56.     }

  57.     /**
  58.      * We will create the file and get the collector object
  59.      */
  60.     public void open(Map conf, TopologyContext context,
  61.             SpoutOutputCollector collector) {
  62.         try {
  63.             this.fileReader = new FileReader(conf.get("wordsFile").toString());
  64.         } catch (FileNotFoundException e) {
  65.             throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
  66.         }
  67.         this.collector = collector;
  68.     }

  69.     /**
  70.      * Declare the output field "word"
  71.      */
  72.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  73.         declarer.declare(new Fields("line"));
  74.     }
  75. }

点击(此处)折叠或打开

  1. import backtype.storm.topology.BasicOutputCollector;
  2. import backtype.storm.topology.OutputFieldsDeclarer;
  3. import backtype.storm.topology.base.BaseBasicBolt;
  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Tuple;
  6. import backtype.storm.tuple.Values;

  7. public class WordNormalizer extends BaseBasicBolt {

  8.     public void cleanup() {}

  9.     /**
  10.      * The bolt will receive the line from the
  11.      * words file and process it to Normalize this line
  12.      *
  13.      * The normalize will be put the words in lower case
  14.      * and split the line to get all words in this
  15.      */
  16.     public void execute(Tuple input, BasicOutputCollector collector) {
  17.         String sentence = input.getString(0);
  18.         String[] words = sentence.split(" ");
  19.         for(String word : words){
  20.             word = word.trim();
  21.             if(!word.isEmpty()){
  22.                 word = word.toLowerCase();
  23.                 System.out.println("-------word:"+word);
  24.                 collector.emit(new Values(word));
  25.             }
  26.         }
  27.     }
  28.     

  29.     /**
  30.      * The bolt will only emit the field "word"
  31.      */
  32.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  33.         declarer.declare(new Fields("word"));
  34.     }
  35. }

点击(此处)折叠或打开

  1. import java.util.HashMap;
  2. import java.util.Map;

  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.BasicOutputCollector;
  5. import backtype.storm.topology.OutputFieldsDeclarer;
  6. import backtype.storm.topology.base.BaseBasicBolt;
  7. import backtype.storm.tuple.Tuple;

  8. public class WordCounter extends BaseBasicBolt {

  9.     Integer id;
  10.     String name;
  11.     Map<String, Integer> counters;

  12.     /**
  13.      * At the end of the spout (when the cluster is shutdown
  14.      * We will show the word counters
  15.      */
  16.     @Override
  17.     public void cleanup() {
  18.         System.out.println("-- Word Counter ["+name+"-"+id+"] --");
  19.         for(Map.Entry<String, Integer> entry : counters.entrySet()){
  20.             System.out.println(entry.getKey()+": "+entry.getValue());
  21.         }
  22.     }

  23.     /**
  24.      * On create
  25.      */
  26.     @Override
  27.     public void prepare(Map stormConf, TopologyContext context) {
  28.         this.counters = new HashMap<String, Integer>();
  29.         this.name = context.getThisComponentId();
  30.         this.id = context.getThisTaskId();
  31.     }

  32.     @Override
  33.     public void declareOutputFields(OutputFieldsDeclarer declarer) {}


  34.     @Override
  35.     public void execute(Tuple input, BasicOutputCollector collector) {
  36.         String str = input.getString(0);
  37.         /**
  38.          * If the word dosn't exist in the map we will create
  39.          * this, if not We will add 1
  40.          */
  41.         if(!counters.containsKey(str)){
  42.             counters.put(str, 1);
  43.         }else{
  44.             Integer c = counters.get(str) + 1;
  45.             counters.put(str, c);
  46.         }
  47.     }
  48. }


点击(此处)折叠或打开

  1. import spouts.WordReader;
  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.topology.TopologyBuilder;
  5. import backtype.storm.tuple.Fields;
  6. import bolts.WordCounter;
  7. import bolts.WordNormalizer;


  8. public class TopologyMain {
  9.     public static void main(String[] args) throws InterruptedException {
  10.          
  11.         //Topology definition
  12.         TopologyBuilder builder = new TopologyBuilder();
  13.         builder.setSpout("word-reader",new WordReader());
  14.         builder.setBolt("word-normalizer", new WordNormalizer())
  15.             .shuffleGrouping("word-reader");
  16.         builder.setBolt("word-counter", new WordCounter(),2)
  17.             .fieldsGrouping("word-normalizer", new Fields("word"));
  18.         
  19.         //Configuration
  20.         Config conf = new Config();
  21.         //String fileName=args[0];
  22.         String fileName="D:\\words.txt";
  23.         conf.put("wordsFile", fileName);
  24.         conf.setDebug(false);
  25.         //Topology run
  26.         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  27.         LocalCluster cluster = new LocalCluster();
  28.         cluster.submitTopology("Getting-Started-Toplogy", conf, builder.createTopology());
  29.         Thread.sleep(50000);

  30.         cluster.shutdown();
  31.     }
  32. }



阅读(1511) | 评论(0) | 转发(0) |
0

上一篇:Strom安装运行

下一篇:hadoop集群安装

给主人留下些什么吧!~~