MapReduce是一种可用于数据处理的编程模型,Hadoop可以运行由各种语言编写的MapReduce程序。MapReduce程序本质上是并行运行的,因此,可以将大规模的数据分析任务交给任何一个拥有足够多机器的运营商。
MapReduce的优势在于处理大规模数据集,这里以一个气象数据集分析为例说明之,查询数据集中每年全球气温的最高记录是多少。
一、MapReduce原理
1、map阶段和reduce阶段
为了充分发挥Hadoop提供的并行处理优势,我们需要将查询表示成MapReduce作业。经过一些本地的小规模测试,我们将能够在集群设备上运行Hadoop。
MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择它们的类型。程序员还需要具体定义两个函数:map函数和reduce函数。
map阶段的输入是原始的气象数据,输入格式可以是文本格式,以便将数据集的每一行作为一个文本值进行输入。map函数只是一个数据处理阶段,通过这种方式来准备数据,使reduce函数能在该转杯数据上继续处理,即找出每年的最高气温。这里键(key)是文件中的行偏移量,map函数的输出是年份和气温值,比如:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
map函数的输出经由MapReduce框架处理后,最后被发送到reduce函数,这一处理过程中需要根据键对键/值对进行排序和分组。这样,reduce函数会看到如下输入:
(1949, [111, 78])
(1950, [0, 22, -11])
每一年份后紧跟着一系列气温数据,所有reduce函数现在需要做的是遍历整个列表并从中找出最大的读数:
(1949, 111)
(1950, 22)
这是最终输出结果:每一年的全球最高气温记录。
整个数据流如下图所示:
图1 MapReduce的逻辑数据流
上图中的底部是Unix的流水线(pipeline,即管道或管线)命令,用于模拟整个MapReduce的流程。
2、Java MapReduce
对于Java来说实现上面的MapReduce过程,需要三样东西:一个map函数、一个reduce函数和一些用来运行作业的代码。
(1)map函数
map函数由Mapper接口实现来表示,后者声明了一个map()方法,下面是我们的map函数实现(查找最高气温的Mapper):
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
implements Mapper {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
该Mapper接口是一个泛型类型,它有4个形参,分别指定map函数的输入键(即一个长整数偏移量)、输入值(即一行文本)、输出键(即年份)和输出值(即气温)的类型。Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型,这些类型均可在org.apache.hadoop.io包中找到,这里使用LongWritable类型(相当于Java中的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java中的Integer类型)。
map()方法的输入是一个键和值,首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取感兴趣的列。
map()方法还提供了OutputCollector实例用于输出内容的写入,即将年份数据按Text对象进行读/写,将气温值封装在IntWritable类型中。
只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。
(2)reduce函数
reduce函数通过Reducer进行类似的定义(查找最高气温的Reducer),具体如下:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer {
public void reduce(Text key, Interator values,
OutputCollector output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
同样,针对reduce函数也有4个形参用于指定其输入和输出类型。reduce函数的输入类型必须与map函数的输出类型相匹配,即Text类型和IntWritable类型。reduce函数的输出类型也必须是Text和IntWritable这两种类型,分别输出年份和最高气温。该最高气温是通过循环比较当前气温与已看到的最高气温获得的。
(3)MapReduce作业
下面代码负责运行MapReduce作业(该应用程序在气象数据集中找出最高气温),具体如下:
import java.io.IOException;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature
JobConf对象指定了作业执行规范,我们可以用它来控制整个作业的运行。在Hadoop集群上运行这个作业时,需要将代码打包成一个JAR文件(Hadoop会在集群上分发这个文件)。我们无需明确指定JAR文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找包含有该类的JAR文件进而找到相关的JAR文件。
构造JobConf对象之后,需要指定输入和输出数据的路径。调用FileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以是单个文件、目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一组文件。由函数名可知,可以多次调用addInputPath()实现多路径的输入。
通过调用FileOutputFormat类中的静态函数setOutputPath()来指定输出路径。该函数指定了reduce函数输出文件的写入目录。在运行任务前该目录不应该存在,否则Hadoop会报错并拒绝运行该任务。这种预防措施是为了防止数据丢失(一个长时间运行任务的结果被意外地覆盖将是非常郁闷的一件事)。
接着,通过setMapperClass()和setReduceClass()指定map和reduce类型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,这两个类型一般是相同的,如果不同,map函数的输出类型则通过setMapOutputKeyClass()和setMapOutputValueClass()函数来设置。
输入的类型通过InputFormat类来控制,如果不设置则使用默认的TextInputFormat(文本输入格式),本示例中未设置。
在设置定义map和reduce函数的类后,便可以开始运行任务。JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。
(4)运行测试
一般写好MapReduce作业后会拿一个小型数据集进行测试以排除代码问题,这里以独立(本机)模式利用Hadoop在本地文件系统上运行作业运行程序,具体命令如下:
% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/sample.txt output
如果调用hadoop命名的第一个参数是类名,则Hadoop将启动一个JVM来运行这个类。使用hadoop命令运行作业比直接使用Java命令运行更方便,因为前者将Hadoop库文件(及其依赖关系)路径加入到类路径参数中,同时也能获得Hadoop的配置文件。我们需要定义一个HADOOP_CLASSPATH环境变量用于添加应用程序类的路径,然后由Hadoop脚本来执行相关操作。
二、Hadoop的Streaming
Hadoop提供了MapReduce的API,并允许使用非Java的其它语言来写自己的map和reduce函数。Hadoop的Streaming使用Unix标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出来写MapReduce程序。
Streaming天生适合用于文本处理,在文本模式下使用时,它有一个数据的行视图。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,它以这样的形式写到标准输出。reduce函数的输入格式相同——通过制表符来分隔的键/值对——并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排序过,最后将结果写入到标准输出。
Streaming支持任何可以在标准输入读取和写入到标准输出中的编程语言,这里用Python来重写按年份查找最高气温的MapReduce程序:
#用python编写用于查找最高气温的map函数
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" %s (year, temp)
#用python编写用于查找最高气温的reduce函数
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, 0)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
由于该脚本只能在标准输入和输出上运行,所以最简单的方式是在Unix管道上进行测试,而不是在Hadoop中进行测试:
% cat input/sample.txt | src/main/python/max_temperature_map.py | \
sort | src/main/python/max_temperature_reduce.py
三、Hadoop的Pipes
Hadoop的Pipes是Hadoop MapReduce的C++接口代称。不同于使用标准输入和输出来实现map代码和reduce代码之间的Streaming,Pipes使用套接字作为tasktracker与C++版本map函数或reduce函数的进程之间的通道,而未使用JNI。
Pipes还允许我们设置一个Java mapper、reducer、合并函数或分区函数。事实上,在任何一个作业中,都可以混合使用Java类或C++类。
阅读(6785) | 评论(1) | 转发(1) |