Chinaunix首页 | 论坛 | 博客
  • 博客访问: 617035
  • 博文数量: 16
  • 博客积分: 10150
  • 博客等级: 上将
  • 技术积分: 209
  • 用 户 组: 普通用户
  • 注册时间: 2006-08-11 14:59
文章分类

全部博文(16)

文章存档

2015年(2)

2014年(2)

2013年(1)

2010年(7)

2009年(3)

2008年(1)

分类: 服务器与存储

2010-07-08 21:17:24

我们生活在一个数据爆炸性增长的年代。

现在存储容量越来越大、成本也越来越低,但是数据的读写速度没有随之增长。

当前大多数硬盘容量为TB级别,但是读取速度只有一百多MB/s

这就意味着需要几个小时才能读完硬盘上的所有数据,更不要说写入数据了。

 

显而易见的解决办法是并行读取,从多个数据源同时获取数据以提高效率。

但要想实现并发访问或者从多个来源读取数据,必须解决以下两个基本问题:

1。硬件错误。服务器集群规模越大,单一个体发生错误概率越大。

2。数据融合。多个来源的数据有效整合。

 

Hadoop针对上述问题而设计,向用户提供可靠的共享存储和分析系统。

Hadoop存储使用HDFS,而分析采用MapReduce机制。

 

为什么不能使用带有多个磁盘的数据库来处理大数据量的并行分析?

为什么要用到MapReduce

答案来自磁盘发展的另一个趋势:寻道时间的缩短远远小于传输速度的增加。

寻道时间影响磁盘操作的延迟,传输速度影响磁盘的带宽。

 

如果只是更新数据库中的一小部分记录,传统基于B树的关系型数据库表现良好。

但是如果需要更新数据库的大部分记录,MapReduce相比B树更加高效。

两者之间的对比:

MapReduce可以看做是关系型数据库管理系统的补充。如下表所示:

RDBMS compared to MapReduce

 

Traditional RDBMS

MapReduce

Data size

Gigabytes

Petabytes

Access

Interactive and batch

Batch

Updates

Read and write many times

Write once, read many times

Structure

Static schme

Dynamic schema

Integrity

High

Low

Scaling

Nonlinear

Linear

 

1 MapReduce更适合需要分析整个DataSet的场合,尤其是随机分析。

    RDBMS适合单点的查询和更新,提供低延迟的小批量数据检索和更新服务。

2 MapReduce适合数据写入一次而需要多次读取的情景。

    RDBMS适合DataSet不断更新的情景。

3 两者另一个区别在于操作对象的结构。MapReduce适合半结构和非结构化数据。RDBMS适合结构化数据。

4 RDBMS通常对数据进行标准化处理以保持系统完整性,删除冗余信息。

    MapReduce没有这个步骤,因为MapReduce被设计为支持数据流的高速读写。

    网络服务器日志文件就是一堆不能被标准化的记录集合,非常适合使用MapReduce来处理和分析。。

   (日志中每一条记录内的客户端机器名都是特定的,虽然同一个机器名可能出现在多个记录中)

5 MapReduce是线性扩展的。输入数据翻倍,任务处理时间也翻倍。

    当然,可以通过处理能力翻倍的方式在时间不变的情况下完成任务。

当然,现在这两者的界限日渐模糊。一方面一些RDBMS(比如Greenplum的数据库)借鉴了MapReduce的一些思想,另一方面,MapReduce内置的查询语言使得MapReduce更加接近RDBMS的使用方式。

 

网络日志等文件的特点:

1。无法标准化。内容不一,无法用二维表来描述。semi-structured and record-oriented.

2。对记录分析着眼于整个数据库而不是部分。

   同时分析内容不确定,没有办法按照特定的类型优化索引,提高效率。

3。数据特性:一次存储,多次访问。

 

以气候分析为例,说明Hadoop的应用场合。

需求:

分析天气情况(这里以获取年度最高气温为例)

文件形式如下:

010010-99999-1990.gz

010014-99999-1990.gz

010015-99999-1990.gz

010016-99999-1990.gz

010017-99999-1990.gz

010030-99999-1990.gz

010040-99999-1990.gz

010080-99999-1990.gz

010100-99999-1990.gz

010150-99999-1990.gz

。。。。。。

每个观测站每天都有数据打包,因此整个的数据集由成千上万的小文件组成。出于数据处理的需要,同一年份的所有小文件被整合为一个大文件。

传统解决方案:

传统解决方式脚本遍历所有年份文件,分别加以分析。

如果需要更快获取结果,可以将不同年份分发到不同机器上。

理论上看起来很好,但是存在如下问题:

1。将任务分割为相等的块不容易。

以气候数据为例,不同年份的文件大小不一,必然导致有些块处理时间相对较短。

由此导致任务时间由尺寸较大的块决定。当然,折衷方案是将所有数据分割为固定大小的切片。

2。需要额外的处理以融合相对独立的分析处理结果。

以气候为例,如果按照年份划分任务,我们可以得到每年的最高气温并整合在一起进行排序。

如果按照固定大小进行切片,我们得到的结果是切片内的最高温度,我们需要进一步的操作以提取年份所有切片中最高温度的最大值。

3。系统仍然受限于单一服务器的处理能力。

单一服务器的最高处理能力是固定的。同时,一些数据集的增长速度大于单一服务器的处理能力的增长。而使用多台服务器,我们需要考虑更多的因素,关键是服务器间的协作和可靠性。

因此,虽然看起来并行处理是简单可行的方案,实际上非常复杂。

基于MapReduce的解决方案:

下面是MapReduce对该需求的处理流程:

首先,假定输入数据如下(每行省略一些无关栏目):

0067011990999991950051507004...9999999N9+00001+99999999999...

0043011990999991950051512004...9999999N9+00221+99999999999...

0043011990999991950051518004...9999999N9-00111+99999999999...

0043012650999991949032412004...0500001N9+01111+99999999999...

0043012650999991949032418004...0500001N9+00781+99999999999...

将数据转换为Map函数所需的key-value对(key是该数据行相对文件头的偏移):

(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)

(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)

(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)

(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)

(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)

Map函数忽略key,提取出年份和气温(加粗部分)作为输出。

(1950, 0)

(1950, 22)

(1950, −11)

(1949, 111)

(1949, 78)

Map函数的输出首先经过MapReduce框架的处理,然后被送往Reduce函数。这一处理过程整理并排序key-value对。最终,Reduce函数的输入如下:

 (1949, [111, 78])

(1950, [0, 22, −11])

每一个年份带着一个气温的列表。Reduce函数需要做的就是从列表中提取最大值:

(1949, 111)

(1950, 22)

这就是满足需求(列出所有年份的年度最高气温)的最终结果。

下面是整个数据流程的图片说明: 

上述过程的JAVA实现:

Example 2-3. Mapper for maximum temperature example

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureMapper extends MapReduceBase

  implements Mapper {

  private static final int MISSING = 9999;

 

  public void map(LongWritable key, Text value,

      OutputCollector output, Reporter reporter)

      throws IOException {

   

    String line = value.toString();

    String year = line.substring(15, 19);

    int airTemperature;

if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs

      airTemperature = Integer.parseInt(line.substring(88, 92));

    } else {

      airTemperature = Integer.parseInt(line.substring(87, 92));

    }

    String quality = line.substring(92, 93);

    if (airTemperature != MISSING && quality.matches("[01459]")) {

      output.collect(new Text(year), new IntWritable(airTemperature));

    }

  }

}

Example 2-4. Reducer for maximum temperature example

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureReducer extends MapReduceBase

  implements Reducer {

  public void reduce(Text key, Iterator values,

      OutputCollector output, Reporter reporter)

      throws IOException {

   

    int maxValue = Integer.MIN_VALUE;

    while (values.hasNext()) {

      maxValue = Math.max(maxValue, values.next().get());

    }

    output.collect(key, new IntWritable(maxValue));

  }

}

Example 2-5. Application to find the maximum temperature in the weather dataset

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

public class MaxTemperature {

  public static void main(String[] args) throws IOException {

    if (args.length != 2) {

      System.err.println("Usage: MaxTemperature ");

      System.exit(-1);

    }

   

    JobConf conf = new JobConf(MaxTemperature.class);

    conf.setJobName("Max temperature");

    FileInputFormat.addInputPath(conf, new Path(args[0]));

    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

   

    conf.setMapperClass(MaxTemperatureMapper.class);

    conf.setReducerClass(MaxTemperatureReducer.class);

    conf.setOutputKeyClass(Text.class);

    conf.setOutputValueClass(IntWritable.class);

    JobClient.runJob(conf);

  }

}

0.2 版本的Hadoop引入了新的API,代码可更改如下:

public class NewMaxTemperature {

static class NewMaxTemperatureMapper

extends Mapper {

private static final int MISSING = 9999;

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature = Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if (airTemperature != MISSING && quality.matches("[01459]")) {

context.write(new Text(year), new IntWritable(airTemperature));

}

}

}

static class NewMaxTemperatureReducer

extends Reducer {

public void reduce(Text key, Iterable values,

Context context)

throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;

for (IntWritable value : values) {

maxValue = Math.max(maxValue, value.get());

}

context.write(key, new IntWritable(maxValue));

}

}

public static void main(String[] args) throws Exception {

if (args.length != 2) {

System.err.println("Usage: NewMaxTemperature ");

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(NewMaxTemperature.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(NewMaxTemperatureMapper.class);

job.setReducerClass(NewMaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

阅读(1653) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

chinaunix网友2010-07-08 21:20:23

先把大概记下来,以后再仔细学习。 初学Hadoop,有什么错误请大家指正。