从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。
分类: 大数据
2017-05-27 19:59:53
一般Stage在提交过程中,会产生两种Task,ShuffleMapTask和ResultTask。在ShuffleMapTask执行过程中,会产生Shuffle结果的写磁盘操作。然后ResultTask会从上一个ShuffleMapTask写的磁盘里头读取数据。那么这里头涉及到几个问题?
答:ShuffleMapTask产生的结果一般写入到本地磁盘,数据存入shuffle {$shuffleId}{$mapId}{$reduceId}.data文件中, 然后再本地磁盘建立索引文件shuffle{$shuffleId}{$mapId}{$reduceId}.index。shuffledId是对该shuffle的标识,mapId可以理解成该stage的taskId,reduceId理解成下游stage的taskId。该ShuffleMapTask计算完成后,存入磁盘之后,会向driver的mapStatuses注册shuffled和该executor的BlockManagerId。
答:ResultTask会主动去向driver发送查询请求,通过shuffleId获取该Seq[BlockManagerId],然后通过该ResultTask的taskId,从各个BlockManagerId获取数据。
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]] 会递归执行 // map阶段的处理结果,write会把生成的rdd存入到blockManager中 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
ShuffleMapTask的执行入口是runTask,该过程主要是执行map阶段的处理逻辑,并将结果Tuple,根据key shuffle到不同的数据文件中。 (1)从SparkEnv中获取ShuffleManager对象; (2)ShuffleManager对象将创建ShuffleWriter对象; (3)通过ShuffleWriter对象进行shuffle结果写操作。
将计算结果通过writer函数写入到相应的文件中,调用的是SortShuffleWriter的writer函数。
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { ...... val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) }
(1)创建数据文件,文件名格式:shuffle-{$shuffleId}-{$mapId}-{$reduceId}.datareduceId值为0
(2)创建ShuffleBlockId对象,其中的reduceId值为0;
(3)将shuffle输出分区写入数据文件;
(4)将分区索引信息写入索引文件,索引文件名格式:shuffle-{$shuffleId}-{$mapId-{$reduceId}.index
(5)创建MapStatus对象并返回。示意图如下:
可见,在SortShuffleManager下,每个map对应两个文件:data文件和index文件。其中.data文件存储Map输出分区的数据,.index文件存储每个分区在.data文件中的偏移量及数据长度。这里还需要强调一次,task计算完毕后,会将shuffleId和blockManagerId注册到driver中的MapOutputTracker,MapOutPutTracker的作用是为每个shuffle准备其所需要的所有map out,可以加速map outs传送给shuffle的速度。
在resultTask在执行过程中,首先会调用shuffleRDD的compute函数,如下:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] } override def read(): Iterator[Product2[K, C]] = { val blockFetcherItr = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) // Wrap the streams for compression and encryption based on configuration val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => serializerManager.wrapStream(blockId, inputStream) } ...... }
在read函数中,通过shuffleId 向的MapOutputTracker发送请求,获取Seq[BlockManagerId],然后向各个BlcokManager发送请求,BlcokManager通过shuffleId, mapId参数确认到数据文件,根据startPartition确定数据文件中偏移量,获取数据返回。