Chinaunix首页 | 论坛 | 博客
  • 博客访问: 48398
  • 博文数量: 12
  • 博客积分: 365
  • 博客等级: 一等列兵
  • 技术积分: 135
  • 用 户 组: 普通用户
  • 注册时间: 2012-01-24 15:22
文章分类

全部博文(12)

文章存档

2012年(12)

我的朋友

分类: 云计算

2012-03-30 15:22:45

引用:1.
      2.
RandomWriter(随机写)例子利用 Map/Reduce把 数据随机的写到dfs中。每个map输入单个文件名,然后随机写BytesWritable的键和值到DFS顺序文件。map没有产生任何输出,所以reduce没有执行。产生的数据是可以配置的。配置变量如下



名字 默认值 描述 
test.randomwriter.maps_per_host
 10 Number of maps/host 
test.randomwrite.bytes_per_map
 1073741824 Number of bytes written/map 
test.randomwrite.min_key
 10 minimum size of the key in bytes 
test.randomwrite.max_key
 1000 maximum size of the key in bytes 
test.randomwrite.min_value
 0 minimum size of the value 
test.randomwrite.max_value
 20000 maximum size of the value 


test.randomwriter.maps_per_host表示每个slave节点上运行map的次数。默认情况下,即只有一个数据节点,那么就有10个map,每个map的数据量为1G,因此要将10G数据写入到hdfs中。不过我配置的试验环境中只有2个slave节点,因此有两个map。

test.randomwrite.bytes_per_map我原本以为是随机写输出的测试文件的大小,默认为1G=1*1024*1024*1024,但是我将这个数据改成1*1024*1024以后,输出的测试文件还是1G,这让我很不解。(?)

代码实例

其中test.randomwrite.bytes_per_map=1*1024*1024,test.randomwriter.maps_per_host=1。

1./** 
2. * Licensed to the Apache Software Foundation (ASF) under one 
3. * or more contributor license agreements.  See the NOTICE file 
4. * distributed with this work for additional information 
5. * regarding copyright ownership.  The ASF licenses this file 
6. * to you under the Apache License, Version 2.0 (the 
7. * "License"); you may not use this file except in compliance 
8. * with the License.  You may obtain a copy of the License at 
9. * 
10. *    
11. * 
12. * Unless required by applicable law or agreed to in writing, software 
13. * distributed under the License is distributed on an "AS IS" BASIS, 
14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15. * See the License for the specific language governing permissions and 
16. * limitations under the License. 
17. */  
18.  
19.package org.apache.hadoop.examples;  
20.  
21.import java.io.IOException;  
22.import java.util.Date;  
23.import java.util.Random;  
24.  
25.import org.apache.hadoop.conf.Configuration;  
26.import org.apache.hadoop.conf.Configured;  
27.import org.apache.hadoop.fs.Path;  
28.import org.apache.hadoop.io.BytesWritable;  
29.import org.apache.hadoop.io.Text;  
30.import org.apache.hadoop.io.Writable;  
31.import org.apache.hadoop.io.WritableComparable;  
32.import org.apache.hadoop.mapred.ClusterStatus;  
33.import org.apache.hadoop.mapred.FileOutputFormat;  
34.import org.apache.hadoop.mapred.FileSplit;  
35.import org.apache.hadoop.mapred.InputFormat;  
36.import org.apache.hadoop.mapred.InputSplit;  
37.import org.apache.hadoop.mapred.JobClient;  
38.import org.apache.hadoop.mapred.JobConf;  
39.import org.apache.hadoop.mapred.MapReduceBase;  
40.import org.apache.hadoop.mapred.Mapper;  
41.import org.apache.hadoop.mapred.OutputCollector;  
42.import org.apache.hadoop.mapred.RecordReader;  
43.import org.apache.hadoop.mapred.Reporter;  
44.import org.apache.hadoop.mapred.SequenceFileOutputFormat;  
45.import org.apache.hadoop.mapred.lib.IdentityReducer;  
46.import org.apache.hadoop.util.GenericOptionsParser;  
47.import org.apache.hadoop.util.Tool;  
48.import org.apache.hadoop.util.ToolRunner;  
49.  
50./** 51. * This program uses map/reduce to just run a distributed job where there is 
52. * no interaction between the tasks and each task write a large unsorted 
53. * random binary sequence file of BytesWritable. 
54. * In order for this program to generate data for terasort with 10-byte keys 
55. * and 90-byte values, have the following config: 
56. * &nbsp;</div><div>57. * <?xml version="1.0"?>&nbsp;</div><div>58. * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>&nbsp;</div><div>59. * <configuration>&nbsp;</div><div>60. * &nbsp; <property>&nbsp;</div><div>61. * &nbsp; &nbsp; <name>test.randomwrite.min_key</name>&nbsp;</div><div>62. * &nbsp; &nbsp; <value>10</value>&nbsp;</div><div>63. * &nbsp; </property>&nbsp;</div><div>64. * &nbsp; <property>&nbsp;</div><div>65. * &nbsp; &nbsp; <name>test.randomwrite.max_key</name>&nbsp;</div><div>66. * &nbsp; &nbsp; <value>10</value>&nbsp;</div><div>67. * &nbsp; </property>&nbsp;</div><div>68. * &nbsp; <property>&nbsp;</div><div>69. * &nbsp; &nbsp; <name>test.randomwrite.min_value</name>&nbsp;</div><div>70. * &nbsp; &nbsp; <value>90</value>&nbsp;</div><div>71. * &nbsp; </property>&nbsp;</div><div>72. * &nbsp; <property>&nbsp;</div><div>73. * &nbsp; &nbsp; <name>test.randomwrite.max_value</name>&nbsp;</div><div>74. * &nbsp; &nbsp; <value>90</value>&nbsp;</div><div>75. * &nbsp; </property>&nbsp;</div><div>76. * &nbsp; <property>&nbsp;</div><div>77. * &nbsp; &nbsp; <name>test.randomwrite.total_bytes</name>&nbsp;</div><div>78. * &nbsp; &nbsp; <value>1099511627776</value>&nbsp;</div><div>79. * &nbsp; </property>&nbsp;</div><div>80. * </configuration> 
81. *  
82. * Equivalently, {@link RandomWriter} also supports all the above options 
83. * and ones supported by {@link GenericOptionsParser} via the command-line. 
84. */  
85.public class RandomWriter extends Configured implements Tool {  86.    
87.  /** 88.   * User counters 
89.   */  
90.  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }  91.    
92.  /** 93.   * A custom input format that creates virtual inputs of a single string 
94.   * for each map. 
95.   */  
96.  static class RandomInputFormat implements InputFormat {  97.  
98.    /**  99.     * Generate the requested number of file splits, with the filename 
100.     * set to the filename of the output file. 
101.     */  
102.    public InputSplit[] getSplits(JobConf job,   103.                                  int numSplits) throws IOException {  104.      InputSplit[] result = new InputSplit[numSplits];  105.      Path outDir = FileOutputFormat.getOutputPath(job);  
106.      for(int i=0; i < result.length; ++i) {  107.        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,   108.                                  (String[])null);  
109.      }  
110.      return result;  111.    }  
112.  
113.    /** 114.     * Return a single record (filename, "") where the filename is taken from 
115.     * the file split. 
116.     */  
117.    static class RandomRecordReader implements RecordReader {  118.      Path name;  
119.      public RandomRecordReader(Path p) {  120.        name = p;  
121.      }  
122.      public boolean next(Text key, Text value) {  123.        if (name != null) {  124.          key.set(name.getName());  
125.          name = null;  
126.          return true;  127.        }  
128.        return false;  129.      }  
130.      public Text createKey() {  131.        return new Text();  132.      }  
133.      public Text createValue() {  134.        return new Text();  135.      }  
136.      public long getPos() {  137.        return 0;  138.      }  
139.      public void close() {}  140.      public float getProgress() {  141.        return 0.0f;  142.      }  
143.    }  
144.  
145.    public RecordReader getRecordReader(InputSplit split,  146.                                        JobConf job,   
147.                                        Reporter reporter) throws IOException {  
148.      return new RandomRecordReader(((FileSplit) split).getPath());  149.    }  
150.  }  
151.  
152.  static class Map extends MapReduceBase  153.    implements Mapper
154.                      BytesWritable, BytesWritable> {  
155.      
156.    private long numBytesToWrite;  157.    private int minKeySize;  158.    private int keySizeRange;  159.    private int minValueSize;  160.    private int valueSizeRange;  161.    private Random random = new Random();  162.    private BytesWritable randomKey = new BytesWritable();  163.    private BytesWritable randomValue = new BytesWritable();  164.      
165.    private void randomizeBytes(byte[] data, int offset, int length) {  166.      for(int i=offset + length - 1; i >= offset; --i) {  167.        data[i] = (byte) random.nextInt(256);  
168.      }  
169.    }  
170.      
171.    /** 172.     * Given an output filename, write a bunch of random records to it. 
173.     */  
174.    public void map(WritableComparable key,   175.                    Writable value,  
176.                    OutputCollector output,   
177.                    Reporter reporter) throws IOException {  
178.      int itemCount = 0;  179.      while (numBytesToWrite > 0) {  180.        int keyLength = minKeySize +   181.          (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);  
182.        randomKey.setSize(keyLength);  
183.        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());  
184.        int valueLength = minValueSize +  185.          (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);  
186.        randomValue.setSize(valueLength);  
187.        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());  
188.        output.collect(randomKey, randomValue);  
189.        numBytesToWrite -= keyLength + valueLength;  
190.        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);  
191.        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);  
192.        if (++itemCount % 200 == 0) {  193.          reporter.setStatus("wrote record " + itemCount + ". " +   194.                             numBytesToWrite + " bytes left.");  195.        }  
196.      }  
197.      reporter.setStatus("done with " + itemCount + " records.");  198.    }  
199.      
200.    /** 201.     * Save the values out of the configuaration that we need to write 
202.     * the data. 
203.     */  
204.    @Override  
205.    public void configure(JobConf job) {  206.      numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",  207.                                    1*1024*1024);  
208.      minKeySize = job.getInt("test.randomwrite.min_key", 10);  209.      keySizeRange =   
210.        job.getInt("test.randomwrite.max_key", 1000) - minKeySize;  211.      minValueSize = job.getInt("test.randomwrite.min_value", 0);  212.      valueSizeRange =   
213.        job.getInt("test.randomwrite.max_value", 20000) - minValueSize;  214.    }  
215.      
216.  }  
217.    
218.  /** 219.   * This is the main routine for launching a distributed random write job. 
220.   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. 
221.   * The reduce doesn't do anything. 
222.   *  
223.   * @throws IOException  
224.   */  
225.  public int run(String[] args) throws Exception {      226.    if (args.length == 0) {  227.      System.out.println("Usage: writer ");  228.      ToolRunner.printGenericCommandUsage(System.out);  
229.      return -1;  230.    }  
231.      
232.    Path outDir = new Path(args[0]);  233.    JobConf job = new JobConf(getConf());  234.      
235.    job.setJarByClass(RandomWriter.class);  236.    job.setJobName("random-writer");  237.    FileOutputFormat.setOutputPath(job, outDir);  
238.      
239.    job.setOutputKeyClass(BytesWritable.class);  240.    job.setOutputValueClass(BytesWritable.class);  241.      
242.    job.setInputFormat(RandomInputFormat.class);  243.    job.setMapperClass(Map.class);          244.    job.setReducerClass(IdentityReducer.class);  245.    job.setOutputFormat(SequenceFileOutputFormat.class);  246.      
247.    JobClient client = new JobClient(job);  248.    ClusterStatus cluster = client.getClusterStatus();  
249.    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 1);  250.    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",  251.                                             1*1024*1024);  
252.    if (numBytesToWritePerMap == 0) {  253.      System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");  254.      return -2;  255.    }  
256.    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",   257.         numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());  
258.    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);  259.    if (numMaps == 0 && totalBytesToWrite > 0) {  260.      numMaps = 1;  
261.      job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);  262.    }  
263.      
264.    job.setNumMapTasks(numMaps);  
265.    System.out.println("Running " + numMaps + " maps.");  266.      
267.    // reducer NONE   268.    job.setNumReduceTasks(0);  
269.      
270.    Date startTime = new Date();  271.    System.out.println("Job started: " + startTime);  272.    JobClient.runJob(job);  
273.    Date endTime = new Date();  274.    System.out.println("Job ended: " + endTime);  275.    System.out.println("The job took " +   276.                       (endTime.getTime() - startTime.getTime()) /1000 +   
277.                       " seconds.");  278.      
279.    return 0;  280.  }  
281.    
282.  public static void main(String[] args) throws Exception {  283.    int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);  284.    System.exit(res);  
285.  }  
286.  
287.}  
输出信息: 
1.11/10/17 13:27:46 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively  
2.Running 2 maps.  
3.Job started: Mon Oct 17 13:27:47 CST 2011  
4.11/10/17 13:27:47 INFO mapred.JobClient: Running job: job_201110171322_0001  
5.11/10/17 13:27:48 INFO mapred.JobClient:  map 0% reduce 0%  
6.11/10/17 13:29:58 INFO mapred.JobClient:  map 50% reduce 0%  
7.11/10/17 13:30:05 INFO mapred.JobClient:  map 100% reduce 0%  
8.11/10/17 13:30:07 INFO mapred.JobClient: Job complete: job_201110171322_0001  
9.11/10/17 13:30:07 INFO mapred.JobClient: Counters: 8  
10.11/10/17 13:30:07 INFO mapred.JobClient:   Job Counters   
11.11/10/17 13:30:07 INFO mapred.JobClient:     Launched map tasks=3  
12.11/10/17 13:30:07 INFO mapred.JobClient:   org.apache.hadoop.examples.RandomWriter$Counters  
13.11/10/17 13:30:07 INFO mapred.JobClient:     BYTES_WRITTEN=2147504078  
14.11/10/17 13:30:07 INFO mapred.JobClient:     RECORDS_WRITTEN=204528  
15.11/10/17 13:30:07 INFO mapred.JobClient:   FileSystemCounters  
16.11/10/17 13:30:07 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2154580318  
17.11/10/17 13:30:07 INFO mapred.JobClient:   Map-Reduce Framework  
18.11/10/17 13:30:07 INFO mapred.JobClient:     Map input records=2  
19.11/10/17 13:30:07 INFO mapred.JobClient:     Spilled Records=0  
20.11/10/17 13:30:07 INFO mapred.JobClient:     Map input bytes=0  
21.11/10/17 13:30:07 INFO mapred.JobClient:     Map output records=204528  
22.Job ended: Mon Oct 17 13:30:07 CST 2011  
23.The job took 140 seconds.  
在hdfs上产生了两个问价,在/home/hadoop/rand目录下,分别是part-00000(1Gb,r3)和part-00001(1Gb,r3)


本篇文章来源于 Linux公社网站()  原文链接:
阅读(2458) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~