Chinaunix首页 | 论坛 | 博客
  • 博客访问: 651098
  • 博文数量: 149
  • 博客积分: 3901
  • 博客等级: 中校
  • 技术积分: 1558
  • 用 户 组: 普通用户
  • 注册时间: 2009-02-16 14:33
文章分类

全部博文(149)

文章存档

2014年(2)

2013年(10)

2012年(32)

2011年(21)

2010年(84)

分类: 服务器与存储

2011-03-28 14:46:15



数据
[@zw-hadoop-master hadoop]$ hadoop fs -cat /tmp/text.txt
1    9
2    8
3    7
4    6
5    5
6    4
7    3
8    2
9    1
0    0
9    9
8    8
7    7
6    6
5    5
4    4
3    3
2    2
1    1


结果输出 (按照第二列值 分区,分组 。以及按照 第二列,第一列 依次排序):

file_0 output
>-------0
0    0

>-------2
2    2
8    2

>-------4
4    4
6    4

>-------6
4    6
6    6

>-------8
2    8
8    8


file_1 output
>-------1
1    1
9    1

>-------3
3    3
7    3

>-------5
5    5
5    5

>-------7
3    7
7    7

>-------9
1    9
9    9






        class MyMap extends Mapper<LongWritable, Text,

                               Text, NullWritable> {
            @Override
            public void map(LongWritable ll, Text row, Context context)
                    throws IOException, InterruptedException {
                context.write( new Text(row.toString()), NullWritable.get());
            }
        }
        
        class MyReducer extends Reducer<Text, NullWritable,

                                  Text, NullWritable> {
            Text t = new Text();

            public void reduce(Text key,

                    Iterable<NullWritable> values, Context context)
                    throws IOException, InterruptedException {
                t.set(">-------" + key.toString().split("\t")[1]);
                context.write(t, NullWritable.get());
                for (NullWritable value : values) {
                    context.write(key, NullWritable.get());
                }

            }
        }

        // 根据第二列 %2 分区
        class MyPartitioner extends Partitioner<Text, NullWritable> {

            @Override
            public int getPartition(Text key,

                     NullWritable value, int numPartitions) {
                String[] cols = key.toString().split("\t");
                return Integer.parseInt(cols[1]) % numPartitions;
            }
        }

        //以此 第二列,第一列 排序
        class MySort implements RawComparator<Text> {

            @Override
            public int compare(Text o1, Text o2) {
                String suv1 = o1.toString().split("\t")[1];
                String time1 = o1.toString().split("\t")[0];

                String suv2 = o2.toString().split("\t")[1];
                String time2 = o2.toString().split("\t")[0];

                int ii = suv1.compareTo(suv2);
                if (ii != 0)
                    return ii;
                else {
                    return time1.compareTo(time2);
                }
            }

            @Override
            public int compare(byte[] b1, int s1, int l1,

                            byte[] b2, int s2, int l2) {
                Text key1 = new Text();
                Text key2 = new Text();

                DataInputBuffer buffer = new DataInputBuffer();
                try {
                    buffer.reset(b1, s1, l1);
                    key1.readFields(buffer);
                    buffer.reset(b2, s2, l2);
                    key2.readFields(buffer);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                return compare(key1, key2);
            }
        }
        
        // 以第二列 值 分组
        class MyComparator implements RawComparator<Text> {

            @Override
            public int compare(byte[] b1, int s1, int l1,

                        byte[] b2, int s2,int l2) {
                Text key1 = new Text();
                Text key2 = new Text();

                DataInputBuffer buffer = new DataInputBuffer();
                try {
                    buffer.reset(b1, s1, l1);
                    key1.readFields(buffer);
                    buffer.reset(b2, s2, l2);
                    key2.readFields(buffer);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

                String str1 = key1.toString().split("\t")[1];
                String str2 = key2.toString().split("\t")[1];
                return str1.compareTo(str2);
            }

            @Override
            public int compare(Text o1, Text o2) {
                return 0;
            }
        }


    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {


        Configuration config = new Configuration(true);
        config.set("mapred.child.java.opts", "-Xmx1024m -Xms512m");
        config.set("io.sort.mb", "100");

        MapreduceUtil mru = new MapreduceUtil(config);

        String out_path_1 = mru.applicationSpace(null);
        Job job = new Job(config);

        job.setNumReduceTasks(2);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setMapperClass(MyMap.class);

        job.setJarByClass(MapreduceUtil.class);
        job.setReducerClass(MyReducer.class);

        job.setPartitionerClass(MyPartitioner.class);
        

        job.setGroupingComparatorClass(MyComparator.class);
        job.setSortComparatorClass(MySort.class);
        
        FileInputFormat.setInputPaths(job, new Path("/tmp/text.txt"));
        FileOutputFormat.setOutputPath(job, new Path(out_path_1));

        job.waitForCompletion(true);
        System.out.println(out_path_1);

    }








阅读(2409) | 评论(1) | 转发(0) |
0

上一篇:hadoop 使用

下一篇:url 预处理 细节

给主人留下些什么吧!~~

liukaiyi2011-03-29 00:57:41

public void reduce(Text key, Iterable values, Context context) 其实 在 reducer时 Iterable value 和 key 是同时 foreach $> hadoop-0.20.2 org.apache.hadoop.mapred.Task.ValuesIterator.next() ------------------------------------------------------- public VALUE next() { if (!hasNext) { throw new NoSuchElementException("iterate past last value"); } try { readNextValue(); readNextKey(); } catch (IOException ie) { throw new RuntimeException("problem advancing post rec#"+ctr, ie); }