Chinaunix首页 | 论坛 | 博客
  • 博客访问: 145531
  • 博文数量: 54
  • 博客积分: 2682
  • 博客等级: 少校
  • 技术积分: 580
  • 用 户 组: 普通用户
  • 注册时间: 2009-10-24 20:56
文章分类
文章存档

2012年(2)

2011年(10)

2010年(28)

2009年(14)

我的朋友

分类: Java

2009-10-25 11:15:15

断断续续看hadoop0.1的源码也估计有5个月了,觉得这个东西非常有趣,也为自己能看懂一个Mapreduce的开源实现而感到欣喜,这段时间学习了很多j2se的知识,也略微学到了hadoop作者设计软件的思路。在09年7月的时候自己还动手做了个小系统,大概的意思就是在hadoop里插入代码,然后可视化其中的机理。现在我还是一直在研究这个,谁叫我靠这个毕业呢 =。=
 
接下来的很多篇连续的文章里,我给自己的目标是详详细细的记录每个细节,从使用什么ide,到怎么导入hadoop0.1工程,怎么读懂源码,到怎么修改内核源码,并且重新编译最后使用自己的源码并使用。源码我基本上80%都仔细看过了,绝大部分都作了注释。只是一下子发上来估计朋友们会弄不清头绪,我尽量写清晰点。
 
热忱欢迎网友来交流,给出意见!也不知道我能坚持多久,估计有些支持的话会更好。。
 
mapreduce是一个新的分布式计算模型,按我这个土人的理解,分布式计算模型就是一个软件库,它的本事就是能把好多机器集合起来一起做一件事情,让它做得比一台机子更快,它跟以前的mpi有很多相似的地方,比如都牵扯到多台机器(废话。。),但是mapreduce有主从之分;都牵扯到多个分布式进程(废话。。),但是mpi的进程之间通信非常显著和必要,而mapreduce的分布式进程之间通信几乎为0;两者还有一些区别,比如mpi更适合cpu密集型的运算,是一个通用的分布式计算框架,而mapreduce是特地设计用来计算大规模数据集的,说得直白一点,数据量要是不上个几百G上T,还体现不出mapreduce的优势呢!还有,mapreduce的程序是写一次可以应付n个节点的,而不管这个n是多少,我的程序依然不变。不太知道mpi是不是这样,就不在这里妖言惑众了。
另外,mapreduce最开始是由google提出并用c++实现的,没有源码发布。hadoop是dong cutting用java写的开源的实现,这老兄以前是大名鼎鼎的lucene的作者。膜拜一下先。
 
下一篇讲如何获取hadoop的源码,hadoop源码的组织结构是什么,各个包是干嘛的,还有怎么导入源码到eclipse中,甚至怎么修改源码,编译新内核。以及一些mapreduce的基础知识,包括它的两个组成部分,map和reduce到底是在干嘛。有时间也会写怎么使用hadoop的。。以及为什么选择0.1版本。
 
第一篇就从一个mapreduce的经典例子讲起。wordcount.java。说简单点,当输入是
##file1.txt
hello hello hello world
world
##file2.txt
world hello other
那么输出是
hello 4
world 3
other 1
现在就知道wordcount其实就是计算词出现的次数的。如果你用这个数据量当输入,那么你会失望透顶,因为运算速度巨慢,但是数据量是上G的时候,你还是会喜出望外的。
 
上源码!
 
 

/**
 * Copyright 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.hadoop.examples;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.MapReduceBase;

/**
 * This is an example Hadoop Map/Reduce application. It reads the text input
 * files, breaks each line into words and counts them. The output is a locally
 * sorted list of words and the count of how often they occurred.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar wordcount [-m maps]
 * [-r reduces] in-dir out-dir
 *
 * @author Owen O'Malley
 */

public class WordCount {
    
/*这个类有三个部分组成:
     * 1. Mapper Class
     * 2. Reducer Class
     * 3. main
     * 一个典型的Hadoop程序需要至少提供Mapper和Reducer类,可以选择性的提供InputFormat等类
     * 若手头已经有hadoop0.1版本,并且配置好了,那么可以用以下命令行来运行wordcount程序
     * bin/hadoop jar build/hadoop-examples.jar wordcount inputdir outputdir
     *
     */


    
/**
     * Counts the words in each line. For each line of input, break the line
     * into words and emit them as (word, 1).
     */

    
// MapReduceBase close configure(JobConf )

    
// 从这个例子中可以看出利用了两条技巧

    
// 一是用具体实现类,比如MapClass是Mapper的实现类,

    
// hadoop框架会调用这个具体的实现类,利用的是多态,因为在hadoop眼中,MapClass只是一个实现了Mapper接口的一个类而已

    
// hadoop只管对Mapper进行操作就行

    
// 而是用高层抽象类,比如OutputCollector,在wordcount例子中,这个OutputCollector实际上

    
// 是一个接口,在具体应用中,hadoop会自行构造一个具体实现类,然后传入进来,

    
// 以真正的告诉wordcount你的输出要往哪放

    
// 有了以上两点机制,才使得整个hadoop系统具有应变性和扩展性

    
//最后在MapRunner里被整成Mapper接口

    public static class MapClass extends MapReduceBase implements Mapper {
        
//内部静态类,附件里有个文档是java内部类的。很详细,我没看完。

        
//hadoop源码的阅读需要比较好的java基础,一些高级主题也要比较熟悉

        
//就比如内部类,Class机制.附件里有个文档可以看看。


        private final static IntWritable one = new IntWritable(1);
        
//IntWritable是把一个int数据包了一层的类,以后会有很多这样的类出现,都是为了

        
//利于网络传输,hadoop有自己的非常巧妙的网络类传输机制。如果手上已经有源码了,那么找到

        
//org.apache.hadoop.io包,里面有很多类,都是把java的基础数据类型包装一层,成为支持网络

        
//传输的类了。

        
        private UTF8 word = new UTF8();
        
//基本上可以看作是String,对UTF8编码有特殊代码处理


        public void map(WritableComparable key, Writable value,
                OutputCollector output, Reporter reporter) throws IOException {
            
/*
             * 举例说明这个函数的作用,输入可能是这样:
             * map("190", "hello hello hello world", output, reporter) 这个表示文章的第190个字节处有一行数据是"hello hello hello world"
             * 所以这里的key="190" value="hello hello hello world"
             * 经过map函数后,就变成了
             */

            
            
// 这个mapper里干脆就没用到key

            String line = ((UTF8) value).toString();
            StringTokenizer itr = new StringTokenizer(line);
            
//java.util.StringTokenizer 把字符串按空格等字符分割开来

            
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
//

                
//collect,在OutputCollector,MapTask类里。 是根据key的hash值确定输出的partition位置的

            }
        }
    }

    
/**
     * A reducer class that just emits the sum of the input values.
     */

    public static class Reduce extends MapReduceBase implements Reducer {
        
/*
         * 举例说明Reduce类的作用
         * 比如输入是
         * 那么是这么调用的 reduce("hello", Iterator , output, reporter)
         * 输出是
         * 这里的Iterator是java.util.Iterator接口。c++里也有类似
         *
         */

        
        
//这个跟LongSumReducer

        public void reduce(WritableComparable key, Iterator values,
                OutputCollector output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += ((IntWritable) values.next()).get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

    static void printUsage() {
        System.out
                .println("wordcount [-m ] [-r ] ");
        System.exit(1);
    }

    
/**
     * The main driver for word count map/reduce program. Invoke this method to
     * submit the map/reduce job.
     *
     * @throws IOException
     * When there is communication problems with the job tracker.
     */

    public static void main(String[] args) throws IOException {
        Configuration defaults = new Configuration();
        
//内核中Configuration就是一个java.io.Properties 是一组对,存储hadoop程序的配置

        
//比如要存储Namenode的地址,那么就是<"fs.default.name", "node01:9090">

        
//大致有几百个项配置选项


        JobConf conf = new JobConf(defaults, WordCount.class);
        
// 这个构造函数里做了一件重要的事,就是setJar(findContainerJar(WordCount.class))

        
// 这个地方的setJar要记住是提交节点的本地文件系统的jar包路径,如果要把JobConf这个对象提交到别的节点上去,那么在那个节点上是肯定需要setJar的

        conf.setJobName("wordcount");

        
// 这两者表示的是出来的东西是也就是<单词,个数>

        
// the keys are words (strings)

        conf.setOutputKeyClass(UTF8.class);
// mapred.output.key.class

        
// setOutputKeyClass(JobConf) ->

        
// setClass(Configuration)

        
// the values are counts (ints)

        conf.setOutputValueClass(IntWritable.class);
// mapred.output.value.class


        conf.setMapperClass(MapClass.class);
// setClass(Configuration)

        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        List other_args = new ArrayList();
        for (int i = 0; i < args.length; ++i) {
            try {
                if ("-m".equals(args[i])) {
                    conf.setNumMapTasks(Integer.parseInt(args[++i]));
// setInt(Configuration)

                } else if ("-r".equals(args[i])) {
                    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
// setInt(Configuration)

                } else {
                    other_args.add(args[i]);
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of "
                        + args[i]);
                printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                printUsage();
// exits

            }
        }
        
// Make sure there are exactly 2 parameters left.

        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            printUsage();
        }
        conf.setInputDir(new File((String) other_args.get(0)));
// 这个会作为InputFormat.getSplit的输入

        conf.setOutputDir(new File((String) other_args.get(1)));
// set(Configuration)


        
// Uncomment to run locally in a single process

        
// conf.set("mapred.job.tracker", "local");


        JobClient.runJob(conf);
// JobConf

    }

}


这个类只是初步展示一下hadoop程序的结构,我刚开始接触的时候晕了很长时间。好了,但愿注释很详细,不过没懂也没关系,下一章开始详细讲述
 
文件: Java内部类.rar
大小: 10KB
下载: 下载
我觉得Java内部类应用很广泛的,找了个18页的学习资料。分享下。
阅读(1267) | 评论(2) | 转发(0) |
0

上一篇:没有了

下一篇:ThreadLocal我的理解,觉得蛮好懂的

给主人留下些什么吧!~~

chinaunix网友2011-01-04 10:54:55

如何获取hadoop的源码,hadoop源码的组织结构是什么,各个包是干嘛的,还有怎么导入源码到eclipse中,甚至怎么修改源码 为啥没有继续写了啊 很期待啊

chinaunix网友2010-12-02 09:20:19

very good!