Chinaunix首页 | 论坛 | 博客
  • 博客访问: 496162
  • 博文数量: 80
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1916
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-11 22:01
个人简介

从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。

文章分类

全部博文(80)

文章存档

2017年(11)

2015年(3)

2014年(33)

2013年(33)

分类: 大数据

2017-05-27 19:59:53

Shuffle阶段的write 和 read

一般Stage在提交过程中,会产生两种Task,ShuffleMapTask和ResultTask。在ShuffleMapTask执行过程中,会产生Shuffle结果的写磁盘操作。然后ResultTask会从上一个ShuffleMapTask写的磁盘里头读取数据。那么这里头涉及到几个问题?

  • ShuffleMapTask产生的结果写到哪里去?

答:ShuffleMapTask产生的结果一般写入到本地磁盘,数据存入shuffle {$shuffleId}{$mapId}{$reduceId}.data文件中, 然后再本地磁盘建立索引文件shuffle{$shuffleId}{$mapId}{$reduceId}.index。shuffledId是对该shuffle的标识,mapId可以理解成该stage的taskId,reduceId理解成下游stage的taskId。该ShuffleMapTask计算完成后,存入磁盘之后,会向driver的mapStatuses注册shuffled和该executor的BlockManagerId。

  • ResultTask怎么知道去哪里读去数据?

答:ResultTask会主动去向driver发送查询请求,通过shuffleId获取该Seq[BlockManagerId],然后通过该ResultTask的taskId,从各个BlockManagerId获取数据。

Write 过程

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      // rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any,   Any]]] 会递归执行
      // map阶段的处理结果,write会把生成的rdd存入到blockManager中
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  } 

ShuffleMapTask的执行入口是runTask,该过程主要是执行map阶段的处理逻辑,并将结果Tuple,根据key shuffle到不同的数据文件中。 (1)从SparkEnv中获取ShuffleManager对象; (2)ShuffleManager对象将创建ShuffleWriter对象; (3)通过ShuffleWriter对象进行shuffle结果写操作。

将计算结果通过writer函数写入到相应的文件中,调用的是SortShuffleWriter的writer函数。

override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {  
  ......  
  val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)  
  val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)  
  val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)  
  shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)  
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)  
} 

(1)创建数据文件,文件名格式:shuffle-{$shuffleId}-{$mapId}-{$reduceId}.datareduceId值为0

(2)创建ShuffleBlockId对象,其中的reduceId值为0;

(3)将shuffle输出分区写入数据文件;

(4)将分区索引信息写入索引文件,索引文件名格式:shuffle-{$shuffleId}-{$mapId-{$reduceId}.index

(5)创建MapStatus对象并返回。示意图如下:

20150519160334786

可见,在SortShuffleManager下,每个map对应两个文件:data文件和index文件。其中.data文件存储Map输出分区的数据,.index文件存储每个分区在.data文件中的偏移量及数据长度。这里还需要强调一次,task计算完毕后,会将shuffleId和blockManagerId注册到driver中的MapOutputTracker,MapOutPutTracker的作用是为每个shuffle准备其所需要的所有map out,可以加速map outs传送给shuffle的速度。

Read 过程

在resultTask在执行过程中,首先会调用shuffleRDD的compute函数,如下:

 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

    override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))

    // Wrap the streams for compression and encryption based on configuration
    val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
      serializerManager.wrapStream(blockId, inputStream)
    }
   ......
} 

在read函数中,通过shuffleId 向的MapOutputTracker发送请求,获取Seq[BlockManagerId],然后向各个BlcokManager发送请求,BlcokManager通过shuffleId, mapId参数确认到数据文件,根据startPartition确定数据文件中偏移量,获取数据返回。

阅读(2474) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~