引用:1.
2.
RandomWriter(随机写)例子利用 Map/Reduce把 数据随机的写到dfs中。每个map输入单个文件名,然后随机写BytesWritable的键和值到DFS顺序文件。map没有产生任何输出,所以reduce没有执行。产生的数据是可以配置的。配置变量如下
名字 默认值 描述
test.randomwriter.maps_per_host
10 Number of maps/host
test.randomwrite.bytes_per_map
1073741824 Number of bytes written/map
test.randomwrite.min_key
10 minimum size of the key in bytes
test.randomwrite.max_key
1000 maximum size of the key in bytes
test.randomwrite.min_value
0 minimum size of the value
test.randomwrite.max_value
20000 maximum size of the value
test.randomwriter.maps_per_host表示每个slave节点上运行map的次数。默认情况下,即只有一个数据节点,那么就有10个map,每个map的数据量为1G,因此要将10G数据写入到hdfs中。不过我配置的试验环境中只有2个slave节点,因此有两个map。
test.randomwrite.bytes_per_map我原本以为是随机写输出的测试文件的大小,默认为1G=1*1024*1024*1024,但是我将这个数据改成1*1024*1024以后,输出的测试文件还是1G,这让我很不解。(?)
代码实例
其中test.randomwrite.bytes_per_map=1*1024*1024,test.randomwriter.maps_per_host=1。
1./**
2. * Licensed to the Apache Software Foundation (ASF) under one
3. * or more contributor license agreements. See the NOTICE file
4. * distributed with this work for additional information
5. * regarding copyright ownership. The ASF licenses this file
6. * to you under the Apache License, Version 2.0 (the
7. * "License"); you may not use this file except in compliance
8. * with the License. You may obtain a copy of the License at
9. *
10. *
11. *
12. * Unless required by applicable law or agreed to in writing, software
13. * distributed under the License is distributed on an "AS IS" BASIS,
14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15. * See the License for the specific language governing permissions and
16. * limitations under the License.
17. */
18.
19.package org.apache.hadoop.examples;
20.
21.import java.io.IOException;
22.import java.util.Date;
23.import java.util.Random;
24.
25.import org.apache.hadoop.conf.Configuration;
26.import org.apache.hadoop.conf.Configured;
27.import org.apache.hadoop.fs.Path;
28.import org.apache.hadoop.io.BytesWritable;
29.import org.apache.hadoop.io.Text;
30.import org.apache.hadoop.io.Writable;
31.import org.apache.hadoop.io.WritableComparable;
32.import org.apache.hadoop.mapred.ClusterStatus;
33.import org.apache.hadoop.mapred.FileOutputFormat;
34.import org.apache.hadoop.mapred.FileSplit;
35.import org.apache.hadoop.mapred.InputFormat;
36.import org.apache.hadoop.mapred.InputSplit;
37.import org.apache.hadoop.mapred.JobClient;
38.import org.apache.hadoop.mapred.JobConf;
39.import org.apache.hadoop.mapred.MapReduceBase;
40.import org.apache.hadoop.mapred.Mapper;
41.import org.apache.hadoop.mapred.OutputCollector;
42.import org.apache.hadoop.mapred.RecordReader;
43.import org.apache.hadoop.mapred.Reporter;
44.import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45.import org.apache.hadoop.mapred.lib.IdentityReducer;
46.import org.apache.hadoop.util.GenericOptionsParser;
47.import org.apache.hadoop.util.Tool;
48.import org.apache.hadoop.util.ToolRunner;
49.
50./** 51. * This program uses map/reduce to just run a distributed job where there is
52. * no interaction between the tasks and each task write a large unsorted
53. * random binary sequence file of BytesWritable.
54. * In order for this program to generate data for terasort with 10-byte keys
55. * and 90-byte values, have the following config:
56. *
57. *
58. *
59. *
61. * test.randomwrite.min_key
62. * 10
63. *
65. * test.randomwrite.max_key
66. * 10
67. *
69. * test.randomwrite.min_value
70. * 90
71. *
73. * test.randomwrite.max_value
74. * 90
75. *
77. * test.randomwrite.total_bytes
78. * 1099511627776
79. *
80. *
81. *
82. * Equivalently, {@link RandomWriter} also supports all the above options
83. * and ones supported by {@link GenericOptionsParser} via the command-line.
84. */
85.public class RandomWriter extends Configured implements Tool { 86.
87. /** 88. * User counters
89. */
90. static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } 91.
92. /** 93. * A custom input format that creates virtual inputs of a single string
94. * for each map.
95. */
96. static class RandomInputFormat implements InputFormat { 97.
98. /** 99. * Generate the requested number of file splits, with the filename
100. * set to the filename of the output file.
101. */
102. public InputSplit[] getSplits(JobConf job, 103. int numSplits) throws IOException { 104. InputSplit[] result = new InputSplit[numSplits]; 105. Path outDir = FileOutputFormat.getOutputPath(job);
106. for(int i=0; i < result.length; ++i) { 107. result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 108. (String[])null);
109. }
110. return result; 111. }
112.
113. /** 114. * Return a single record (filename, "") where the filename is taken from
115. * the file split.
116. */
117. static class RandomRecordReader implements RecordReader { 118. Path name;
119. public RandomRecordReader(Path p) { 120. name = p;
121. }
122. public boolean next(Text key, Text value) { 123. if (name != null) { 124. key.set(name.getName());
125. name = null;
126. return true; 127. }
128. return false; 129. }
130. public Text createKey() { 131. return new Text(); 132. }
133. public Text createValue() { 134. return new Text(); 135. }
136. public long getPos() { 137. return 0; 138. }
139. public void close() {} 140. public float getProgress() { 141. return 0.0f; 142. }
143. }
144.
145. public RecordReader getRecordReader(InputSplit split, 146. JobConf job,
147. Reporter reporter) throws IOException {
148. return new RandomRecordReader(((FileSplit) split).getPath()); 149. }
150. }
151.
152. static class Map extends MapReduceBase 153. implements Mapper
154. BytesWritable, BytesWritable> {
155.
156. private long numBytesToWrite; 157. private int minKeySize; 158. private int keySizeRange; 159. private int minValueSize; 160. private int valueSizeRange; 161. private Random random = new Random(); 162. private BytesWritable randomKey = new BytesWritable(); 163. private BytesWritable randomValue = new BytesWritable(); 164.
165. private void randomizeBytes(byte[] data, int offset, int length) { 166. for(int i=offset + length - 1; i >= offset; --i) { 167. data[i] = (byte) random.nextInt(256);
168. }
169. }
170.
171. /** 172. * Given an output filename, write a bunch of random records to it.
173. */
174. public void map(WritableComparable key, 175. Writable value,
176. OutputCollector output,
177. Reporter reporter) throws IOException {
178. int itemCount = 0; 179. while (numBytesToWrite > 0) { 180. int keyLength = minKeySize + 181. (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
182. randomKey.setSize(keyLength);
183. randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
184. int valueLength = minValueSize + 185. (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
186. randomValue.setSize(valueLength);
187. randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
188. output.collect(randomKey, randomValue);
189. numBytesToWrite -= keyLength + valueLength;
190. reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
191. reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
192. if (++itemCount % 200 == 0) { 193. reporter.setStatus("wrote record " + itemCount + ". " + 194. numBytesToWrite + " bytes left."); 195. }
196. }
197. reporter.setStatus("done with " + itemCount + " records."); 198. }
199.
200. /** 201. * Save the values out of the configuaration that we need to write
202. * the data.
203. */
204. @Override
205. public void configure(JobConf job) { 206. numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map", 207. 1*1024*1024);
208. minKeySize = job.getInt("test.randomwrite.min_key", 10); 209. keySizeRange =
210. job.getInt("test.randomwrite.max_key", 1000) - minKeySize; 211. minValueSize = job.getInt("test.randomwrite.min_value", 0); 212. valueSizeRange =
213. job.getInt("test.randomwrite.max_value", 20000) - minValueSize; 214. }
215.
216. }
217.
218. /** 219. * This is the main routine for launching a distributed random write job.
220. * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
221. * The reduce doesn't do anything.
222. *
223. * @throws IOException
224. */
225. public int run(String[] args) throws Exception { 226. if (args.length == 0) { 227. System.out.println("Usage: writer "); 228. ToolRunner.printGenericCommandUsage(System.out);
229. return -1; 230. }
231.
232. Path outDir = new Path(args[0]); 233. JobConf job = new JobConf(getConf()); 234.
235. job.setJarByClass(RandomWriter.class); 236. job.setJobName("random-writer"); 237. FileOutputFormat.setOutputPath(job, outDir);
238.
239. job.setOutputKeyClass(BytesWritable.class); 240. job.setOutputValueClass(BytesWritable.class); 241.
242. job.setInputFormat(RandomInputFormat.class); 243. job.setMapperClass(Map.class); 244. job.setReducerClass(IdentityReducer.class); 245. job.setOutputFormat(SequenceFileOutputFormat.class); 246.
247. JobClient client = new JobClient(job); 248. ClusterStatus cluster = client.getClusterStatus();
249. int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 1); 250. long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map", 251. 1*1024*1024);
252. if (numBytesToWritePerMap == 0) { 253. System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0"); 254. return -2; 255. }
256. long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 257. numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
258. int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); 259. if (numMaps == 0 && totalBytesToWrite > 0) { 260. numMaps = 1;
261. job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite); 262. }
263.
264. job.setNumMapTasks(numMaps);
265. System.out.println("Running " + numMaps + " maps."); 266.
267. // reducer NONE 268. job.setNumReduceTasks(0);
269.
270. Date startTime = new Date(); 271. System.out.println("Job started: " + startTime); 272. JobClient.runJob(job);
273. Date endTime = new Date(); 274. System.out.println("Job ended: " + endTime); 275. System.out.println("The job took " + 276. (endTime.getTime() - startTime.getTime()) /1000 +
277. " seconds."); 278.
279. return 0; 280. }
281.
282. public static void main(String[] args) throws Exception { 283. int res = ToolRunner.run(new Configuration(), new RandomWriter(), args); 284. System.exit(res);
285. }
286.
287.}
输出信息:
1.11/10/17 13:27:46 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
2.Running 2 maps.
3.Job started: Mon Oct 17 13:27:47 CST 2011
4.11/10/17 13:27:47 INFO mapred.JobClient: Running job: job_201110171322_0001
5.11/10/17 13:27:48 INFO mapred.JobClient: map 0% reduce 0%
6.11/10/17 13:29:58 INFO mapred.JobClient: map 50% reduce 0%
7.11/10/17 13:30:05 INFO mapred.JobClient: map 100% reduce 0%
8.11/10/17 13:30:07 INFO mapred.JobClient: Job complete: job_201110171322_0001
9.11/10/17 13:30:07 INFO mapred.JobClient: Counters: 8
10.11/10/17 13:30:07 INFO mapred.JobClient: Job Counters
11.11/10/17 13:30:07 INFO mapred.JobClient: Launched map tasks=3
12.11/10/17 13:30:07 INFO mapred.JobClient: org.apache.hadoop.examples.RandomWriter$Counters
13.11/10/17 13:30:07 INFO mapred.JobClient: BYTES_WRITTEN=2147504078
14.11/10/17 13:30:07 INFO mapred.JobClient: RECORDS_WRITTEN=204528
15.11/10/17 13:30:07 INFO mapred.JobClient: FileSystemCounters
16.11/10/17 13:30:07 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=2154580318
17.11/10/17 13:30:07 INFO mapred.JobClient: Map-Reduce Framework
18.11/10/17 13:30:07 INFO mapred.JobClient: Map input records=2
19.11/10/17 13:30:07 INFO mapred.JobClient: Spilled Records=0
20.11/10/17 13:30:07 INFO mapred.JobClient: Map input bytes=0
21.11/10/17 13:30:07 INFO mapred.JobClient: Map output records=204528
22.Job ended: Mon Oct 17 13:30:07 CST 2011
23.The job took 140 seconds.
在hdfs上产生了两个问价,在/home/hadoop/rand目录下,分别是part-00000(1Gb,r3)和part-00001(1Gb,r3)
本篇文章来源于 Linux公社网站() 原文链接:
阅读(2461) | 评论(0) | 转发(0) |