分类: 服务器与存储
2010-07-08 21:17:24
我们生活在一个数据爆炸性增长的年代。
现在存储容量越来越大、成本也越来越低,但是数据的读写速度没有随之增长。
当前大多数硬盘容量为TB级别,但是读取速度只有一百多MB/s,
这就意味着需要几个小时才能读完硬盘上的所有数据,更不要说写入数据了。
显而易见的解决办法是并行读取,从多个数据源同时获取数据以提高效率。
但要想实现并发访问或者从多个来源读取数据,必须解决以下两个基本问题:
1。硬件错误。服务器集群规模越大,单一个体发生错误概率越大。
2。数据融合。多个来源的数据有效整合。
Hadoop针对上述问题而设计,向用户提供可靠的共享存储和分析系统。
Hadoop存储使用HDFS,而分析采用MapReduce机制。
为什么不能使用带有多个磁盘的数据库来处理大数据量的并行分析?
为什么要用到MapReduce?
答案来自磁盘发展的另一个趋势:寻道时间的缩短远远小于传输速度的增加。
寻道时间影响磁盘操作的延迟,传输速度影响磁盘的带宽。
如果只是更新数据库中的一小部分记录,传统基于B树的关系型数据库表现良好。
但是如果需要更新数据库的大部分记录,MapReduce相比B树更加高效。
两者之间的对比:
MapReduce可以看做是关系型数据库管理系统的补充。如下表所示:
RDBMS compared to MapReduce
|
Traditional RDBMS |
MapReduce |
Data size |
Gigabytes |
Petabytes |
Access |
Interactive and batch |
Batch |
Updates |
Read and write many times |
Write once, read many times |
Structure |
Static schme |
Dynamic schema |
Integrity |
High |
Low |
Scaling |
Nonlinear |
Linear |
1。 MapReduce更适合需要分析整个DataSet的场合,尤其是随机分析。
RDBMS适合单点的查询和更新,提供低延迟的小批量数据检索和更新服务。
2。 MapReduce适合数据写入一次而需要多次读取的情景。
RDBMS适合DataSet不断更新的情景。
3。 两者另一个区别在于操作对象的结构。MapReduce适合半结构和非结构化数据。RDBMS适合结构化数据。
4。 RDBMS通常对数据进行标准化处理以保持系统完整性,删除冗余信息。
MapReduce没有这个步骤,因为MapReduce被设计为支持数据流的高速读写。
网络服务器日志文件就是一堆不能被标准化的记录集合,非常适合使用MapReduce来处理和分析。。
(日志中每一条记录内的客户端机器名都是特定的,虽然同一个机器名可能出现在多个记录中)
5。 MapReduce是线性扩展的。输入数据翻倍,任务处理时间也翻倍。
当然,可以通过处理能力翻倍的方式在时间不变的情况下完成任务。
当然,现在这两者的界限日渐模糊。一方面一些RDBMS(比如Greenplum的数据库)借鉴了MapReduce的一些思想,另一方面,MapReduce内置的查询语言使得MapReduce更加接近RDBMS的使用方式。
网络日志等文件的特点:
1。无法标准化。内容不一,无法用二维表来描述。semi-structured and record-oriented.
2。对记录分析着眼于整个数据库而不是部分。
同时分析内容不确定,没有办法按照特定的类型优化索引,提高效率。
3。数据特性:一次存储,多次访问。
以气候分析为例,说明Hadoop的应用场合。
需求:
分析天气情况(这里以获取年度最高气温为例)
文件形式如下:
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
。。。。。。
每个观测站每天都有数据打包,因此整个的数据集由成千上万的小文件组成。出于数据处理的需要,同一年份的所有小文件被整合为一个大文件。
传统解决方案:
传统解决方式脚本遍历所有年份文件,分别加以分析。
如果需要更快获取结果,可以将不同年份分发到不同机器上。
理论上看起来很好,但是存在如下问题:
1。将任务分割为相等的块不容易。
以气候数据为例,不同年份的文件大小不一,必然导致有些块处理时间相对较短。
由此导致任务时间由尺寸较大的块决定。当然,折衷方案是将所有数据分割为固定大小的切片。
2。需要额外的处理以融合相对独立的分析处理结果。
以气候为例,如果按照年份划分任务,我们可以得到每年的最高气温并整合在一起进行排序。
如果按照固定大小进行切片,我们得到的结果是切片内的最高温度,我们需要进一步的操作以提取年份所有切片中最高温度的最大值。
3。系统仍然受限于单一服务器的处理能力。
单一服务器的最高处理能力是固定的。同时,一些数据集的增长速度大于单一服务器的处理能力的增长。而使用多台服务器,我们需要考虑更多的因素,关键是服务器间的协作和可靠性。
因此,虽然看起来并行处理是简单可行的方案,实际上非常复杂。
基于MapReduce的解决方案:
下面是MapReduce对该需求的处理流程:
首先,假定输入数据如下(每行省略一些无关栏目):
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
将数据转换为Map函数所需的key-value对(key是该数据行相对文件头的偏移):
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
Map函数忽略key,提取出年份和气温(加粗部分)作为输出。
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
Map函数的输出首先经过MapReduce框架的处理,然后被送往Reduce函数。这一处理过程整理并排序key-value对。最终,Reduce函数的输入如下:
(1949, [111, 78])
(1950, [0, 22, −11])
每一个年份带着一个气温的列表。Reduce函数需要做的就是从列表中提取最大值:
(1949, 111)
(1950, 22)
这就是满足需求(列出所有年份的年度最高气温)的最终结果。
下面是整个数据流程的图片说明:
上述过程的JAVA实现:
Example 2-3. Mapper for maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
implements Mapper
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
Example 2-4. Reducer for maximum temperature example
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer
public void reduce(Text key, Iterator
OutputCollector
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
Example 2-5. Application to find the maximum temperature in the weather dataset
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
public class NewMaxTemperature {
static class NewMaxTemperatureMapper
extends Mapper
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
static class NewMaxTemperatureReducer
extends Reducer
public void reduce(Text key, Iterable
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: NewMaxTemperature
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}