2013年(350)
分类: HADOOP
2013-04-11 16:59:52
前面部署时曾经测试过wordcount程序,这样我们在Eclipse也调试这一功能。HADOOP提供了这些示例的源代码,大家可以在HADOOP安装文件根路径下的examples目录下,比如WordCount位于:examples/org/apache/hadoop/examples/WordCount.java
我们新建一个文件,右键选中项目名称,点击New -> Class创建一个新的Java Class文件,弹出窗口如下:
将示例代码直接复制进来,而后修改文件头部包名即可。新创建的WordCount.java文件内容如下:
package com.jss.hadoop.mapreduce.test;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}WordCount如要运行,需要指定两个参数,即代码中65行和66行所需指定的路径。针对这种情况,我们即可以改动代码,直接在此处写好目标路径(同时还需要将53-57行之间的代码注释)而后即可直接运行调试;也可以配置WordCount的调试运行环境,为其配置运行参数。这里我们选择后一种方式。
选择菜单:Run -> Run Configurations -> Java Application,点击窗口左上角处的图标:
新建一个配置,将弹出的窗口显示项切换到Arguments选项:
此处需要我们填写Program arguments,即指定程序运行所需参数,根据程序设定,此时需要指定两个参数,一个指定要处理的文件源路径,另一个是处理后文件的输出路径,中间以空格分隔。请根据实际情况指定参数,配置好后,即可点击Run运行。
如果配置正确,执行成功后,在HDFS中就会创建jssout文件夹,如上图所示,其中保存的文件,就是对源路径中数据处理后的输出结果。
若要操作HDFS中的目录和文件也是同理,继续创建文件(过程不演示)FileOper.java,代码如下:
$ more /data/developer/workspace/FirstHadoopProject/src/com/jss/hadoop/hdfs/test/FileOper.java
package com.jss.hadoop.hdfs.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class FileOper {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.out.println("Must define parameters!");
} else {
Configuration conf = new Configuration();
conf.set("fs.default.name", args[0]);
FileOper.listHDFSFiles(conf); // 显示目录结构
//FileOper.uploadLocal2HDFS(conf, args[1], args[2]); // 上传文件
//FileOper.createHDFSFile(conf, args[1], args[2]); // 创建文件
//FileOper.deleteHDFSFile(conf, args[1]); // 删除文件
//FileOper.readHDFSFile(conf, args[1]); // 读取文件
//FileOper.makeHDFSDirectory(conf, args[1]); // 创建目录
//FileOper.removeHDFSDirectory(conf, args[1]); // 删除目录
}
}
public static void listHDFSFiles(Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
FileStatus files[] = fs.listStatus(new Path("/"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
}
public static void uploadLocal2HDFS(Configuration conf, String s, String d)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path src = new Path(s);
Path dst = new Path(d);
fs.copyFromLocalFile(src, dst);
fs.close();
System.out.println("Upload to " + conf.get("fs.default.name"));
}
public static void createHDFSFile(Configuration conf, String createFilePath,
String content) throws IOException {
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fsos = fs.create(new Path(createFilePath));
fsos.write(content.getBytes("UTF-8"));
fsos.close();
fs.close();
System.out.println("Succeeded created file " + createFilePath);
}
public static boolean deleteHDFSFile(Configuration conf, String dst)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(dst);
boolean isDeleted = fs.delete(path, true);
fs.close();
return isDeleted;
}
public static byte[] readHDFSFile(Configuration conf, String dst)
throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(dst);
if (fs.exists(path)) {
FSDataInputStream is = fs.open(path);
// get the file info to create the buffer
FileStatus stat = fs.getFileStatus(path);
// create the buffer
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat
.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
return buffer;
} else {
throw new Exception("the file is not found .");
}
}
public static void makeHDFSDirectory(Configuration conf, String dst)
throws IOException {
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path(dst));
fs.close();
System.out.println("Succeeded created directory " + dst);
}
public static void removeHDFSDirectory(Configuration conf, String dst)
throws IOException {
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(dst), true);
fs.close();
System.out.println("Succeeded remove directory " + dst);
}
}FileOper能够读取HDFS中的文件目录结构,操作文件和目录。程序在执行时,同样需要指定参数,具体步骤与前面操作WordCount的原理相同,就不一一演示了。