Chinaunix首页 | 论坛 | 博客
  • 博客访问: 7119048
  • 博文数量: 703
  • 博客积分: 10821
  • 博客等级: 上将
  • 技术积分: 12042
  • 用 户 组: 普通用户
  • 注册时间: 2005-12-02 10:41
个人简介

中科院云平台架构师,专注于数字化、智能化,技术方向:云、Linux内核、AI、MES/ERP/CRM/OA、物联网、传感器、大数据、ML、微服务。

文章分类

全部博文(703)

分类: 大数据

2017-05-15 16:56:39

本文主要演示如何通过Python对Spark的RDD进行编程,只列出了一些常用的RDD操作接口,完整的功能,请参考官方文档

  1. 演示环境说明
    RDD的详细介绍请参考:
    http://blog.csdn.net/eric_sunah/article/details/49705145
    操作系统:Ubuntu 12.04
    部署环境:1.6
    单机版
    演示环境:pyspark
    测试语言:Python
  2. Transformation
    1. map
      1. 概述:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
      2. 样例
        >>> templist=[1,2,3,4,5,6]
        >>> rdd=sc.parallelize(templist)
        >>> result=rdd.map(lambda x:x*3)
        >>> result.collect()
        [3, 6, 9, 12, 15, 18]
    2. filter
      1. 概述:filter是通过指定的函数对已有的RDD做过滤操作,只有符合条件的元素才会被放到新的RDD中
      2. 样例
        >>> templist=[1,2,3,4,5,6]
        >>> rdd=sc.parallelize(templist)
        >>> result=rdd.filter(lambda x:x%2==0)
        >>> result.collect()
        [2, 4, 6]

    3. flatMap
      1. 概览:类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
      2. 样例
        >>> templist=[1,2,3,4,5,6]
        >>> rdd=sc.parallelize(templist)
        >>> result=rdd.flatMap(lambda x:x)
        >>> result.collect()
        [0, 0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5]
    4. mapPartitions
      1. 概述:mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
        func作为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数func,func的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

        如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。

        比如,将RDD中的所有数据通过JDBC连接写入,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。

      2. 样例
        >>> templist=[1,2,3,4,5,6]
        >>> rdd=sc.parallelize(templist)
        >>> def func(chain):
        ...     for item in chain:
        ...             yield item*2
        ... 
        >>> result=rdd.mapPartitions(func);
        >>> result.collect()
        [2, 4, 6, 8, 10, 12]
    5. mapPartitionsWithIndex
      1. 概述:和mapPattitions类似只是它能把分区的index传递给用户指定的输入函数
      2. 样例
        >>> templist=[1,2,3,4,5,6]
        >>> rdd=sc.parallelize(templist)
        >>> def func(par_index,chain):
        ...     for item in chain:
        ...             yield item*par_index
        ...             print "##partition index:%d  item:%d" %(par_index,item)
        ...     print "###partition index:%d" %(par_index)
        ... 
        >>> result=rdd.mapPartitionsWithIndex(func)
        >>> result.collect()
        ###partition index:4
        ##partition index:1  item:1
        ###partition index:1
        ###partition index:0
        ##partition index:5  item:4
        ###partition index:5
        ##partition index:2  item:2
        ###partition index:2
        ##partition index:7  item:6
        ###partition index:7
        ##partition index:6  item:5
        ###partition index:6
        ##partition index:3  item:3
        ###partition index:3
        [1, 4, 9, 20, 30, 42]
    6. mapValues
      1. 概述:mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
      2. 样例
        >>> originalMap={1:"first",2:"second",3:"thrid"}
        >>> keyRdd=sc.parallelize(originalMap)
        >>> mapRdd=keyRdd.map(lambda x:(x,originalMap[x]))
        >>> newMapRdd=mapRdd.mapValues(lambda x:x.upper())
        >>> newMapRdd.collect()
        [(1, 'FIRST'), (2, 'SECOND'), (3, 'THRID')]
    7. mapWith
      1. 概述:mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:
        def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U] 
        • 第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
        • 第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。
      2. 样例:Python API没有提供该接口
    8. sample
      1. 概述:随机返回RDD中的样本数据,方法定义为sample(withReplacement, fraction, seed=None)
        withReplacement:表示一个元素是否可以出现多次
        fraction:随机的不重复样本占整个RDD的比例,值得范围为[0,1]
        seed:随机种子
      2. 样例
        In [36]: rdd = sc.parallelize(range(100), 4)
        In [37]: rdd.sample(False,0.1,37).collect()
        Out[37]: [9, 10, 18, 22, 52, 53, 64, 66, 85, 91, 96]
    9. union
      1. 概述:将两个RDD进行结合,返回并集
      2. 样例
        >>> rdd = sc.parallelize([1, 1, 2, 3])
        >>> rdd2 = sc.parallelize([4, 5, 6, 7])
        [1, 1, 2, 3, 4, 5, 6, 7]
    10. intersection
      1. 概述:返回两个RDD的交集,如果交集包含重复元素,那么也只显示一个。该操作内部会执行shuffer操作
      2. 样例
        >>> rdd1 = sc.parallelize([1, 10, 2,2, 3, 4, 5])
        >>> rdd2 = sc.parallelize([1, 6, 2,2, 3, 7, 8])
        >>> rdd1.intersection(rdd2).collect();
        [1, 2, 3]
    11. distinct
      1. 概述:对一个RDD进行去重操作
      2. 样例
        >>> rdd1 = sc.parallelize([1, 10, 2,2, 3, 4, 5])
        >>> rdd1.distinct().collect()
        [1, 10, 2, 3, 4, 5]
    12. groupByKey
      1. 概述:将RDD中的元素按照key进行分组,如果是为了对每个key进行汇聚操作,使用reduceByKey和aggregateByKey 效率会更高一点
      2. 样例
        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        >>> sorted(rdd.groupByKey().mapValues(len).collect())
        [('a', 2), ('b', 1)]
        >>> sorted(rdd.groupByKey().mapValues(list).collect())
        [('a', [1, 1]), ('b', [1])]
    13. reduceByKey(funcnumPartitions=NonepartitionFunc=)
      1. 概述:首先对RDD进行分组操作并在本地进行合并,合并后的结果再调用func进行reduce处理,
      2. 样例
        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b",3)])
        >>> rdd.reduceByKey(lambda e1,e2:e1+e2).collect()
        [('a', 3), ('b', 4)]
    14. aggregate(zeroValue, seqOp, combOp)
      1. 概述:aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
        下面例子中 红色Tuple表示zeroValue,绿色Tuple表示seqOp产生的结果,橙色Tuple表示comOP产生的结果

      2. 样例
        >>> def seqOp(x,y):
        ...     print "seqOp %s %s" %(str(x),str(y))
        ...     return x[0] + y, x[1] + 1
        ... 
        >>> def comOp(x,y):
        ...     print "comOp %s %s" %(str(x),str(y))
        ...     return x[0] + y[0], x[1] + y[1]
        ... 
        >>> sc.parallelize([1, 2, 3, 4]).aggregate((1, 1), seqOp, comOp)
        seqOp (1, 1) 1
        seqOp (1, 1) 2
        seqOp (1, 1) 3
        seqOp (1, 1) 4

        comOp (1, 1) (1, 1)
        comOp (2, 2) (2, 2)
        comOp (4, 4) (1, 1)
        comOp (5, 5) (3, 2)
        comOp (8, 7) (1, 1)
        comOp (9, 8) (4, 2)
        comOp (13, 10) (1, 1)
        comOp (14, 11) (5, 2)
        (19, 13)

    15. sortByKey(ascending=True, numPartitions=None, keyfunc= at 0x7f1ac7345de8>)
      1. 概述:对RDD安装Key排序,前提是RDD的元素类型是(K,V)型的
        keyfunc只是在比较的时候做对应的操作,而不是改变原有RDD里面的值
      2. 样例
        >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
        >>> sc.parallelize(tmp).sortByKey().collect()
        [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

        >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5),('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]
        >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
        [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
    16. join
      1. 概述:按照Key合并两个(K,V)类型的RDD,合并后的数据形式为k, (v1, v2),该操作是跨整个集群的操作
      2. 样例
        >>> x = sc.parallelize([("a", 1), ("b", 4)])
        >>> y = sc.parallelize([("a", 2), ("a", 3)])
        >>> x.join(y).collect()
        [('a', (1, 2)), ('a', (1, 3))]
    17. cogroup
      1. 概述:对两个包含(K,V)类型列表的RDD进行操作,返回的结果是按照key进行组织的tuple列表
      2. 样例
        >>> x = sc.parallelize([("a", 1), ("b", 4)])
        >>> y = sc.parallelize([("a", 2)])
        >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
        [('a', ([1], [2])), ('b', ([4], []))]
    18. groupWith(other, *others)
      1. 概述:和cogroup类似,只是支持同事对多个RDD进行操作
      2. 样例
        >>> w = sc.parallelize([("a", 5), ("b", 6)])
        >>> x = sc.parallelize([("a", 1), ("b", 4)])
        >>> y = sc.parallelize([("a", 2)])
        >>> z = sc.parallelize([("b", 42)])
        >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
        [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
    19. pipe
      1. 概述:通过调用一个外部程序生成RDD,例如tr 'A-Z' 'a-z'命令主要用来将输入装换成小写,下面的例子用来演示如果通过该命令将RDD的元素都转换成小写
      2. 样例
        >>> sc.parallelize(['sun', 'BDE', 'ddddsacF', 'asdfasdf']).pipe("tr 'A-Z' 'a-z'").collect()
        [u'sun', u'bde', u'ddddsacf', u'asdfasdf']
    20. coalesce(numPartitions, shuffle=False)
      1. 概述:对RDD的数据按照指定的分区数重新分区。新分配的分区数必须小于原始分区数
      2. 样例
        >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
        [[1], [2, 3], [4, 5]]
        >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
        [[1, 2, 3, 4, 5]]
        >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(4).glom().collect()
        [[1], [2, 3], [4, 5]]

    21. repartition(numPartitions)
      1. 概述:返回一个重新分区过的RDD,分区的数量可以增加也可以减少,内部会使用shuffle来重新分配数据。
        在partition数量减少的情况下,建议使用coalesce(可以避免执行shuffle),
      2. 样例
        >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
        >>> sorted(rdd.glom().collect())
        [[1], [2, 3], [4, 5], [6, 7]]
        >>> len(rdd.repartition(2).glom().collect())
        2
        >>> len(rdd.repartition(10).glom().collect())
        10
    22. cartesian
      1. 概述:生成两个RDD的笛卡尔集
      2. 样例
        >>> rdd = sc.parallelize([1, 2])
        >>> rdd2 = sc.parallelize([2, 3])
        >>> rdd.cartesian(rdd2).collect()
        [(1, 2), (1, 3), (2, 2), (2, 3)]
  3. Action
    1. reduce
      1. 概述:通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
      2. 样例
        对RDD做sum操作
        >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
        15
        从RDD中选出最大值
        >>> sc.parallelize([11, 2, 8, 9, 5]).reduce(lambda x,y:max(x,y))
        11
    2. collect
      1. 概述:在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
      2. 样例
        >>> sc.parallelize([11, 2, 8, 9, 5]).filter(lambda x:x%2==0).collect()
        [2, 8]
    3. count
      1. 概述:返回数据集的元素个数
      2. 样例
        >>> sc.parallelize([11, 2, 8, 9, 5]).count()
        5
    4. take
      1. 概述:返回一个数组,由数据集的前n个元素组成。该函数会首先在一个分区上进行扫描,用第一个分区的扫描结果去评估其他的分区情况
      2. 样例
        >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
        [2, 3]
        >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
        [2, 3, 4, 5, 6]
        >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
        [91, 92, 93]
    5. first
      1. 概述:返回数据集的第一个元素(类似于take(1)
      2. 样例
        >>> sc.parallelize([2, 3, 4]).first()
        2
    6. takeSample(withReplacement, num, seed=None)
      1. 概述: 返回固定数量的样本
      2. 样例
        >>> sc.parallelize(range(100),3).takeSample(False,10);
        [44, 34, 27, 54, 30, 21, 58, 85, 45, 32]
    7. takeOrdered(num, key=None)
      1. 概述:按照指定的顺序返回一定数量的样本
      2. 样例
        >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
        [1, 2, 3, 4, 5, 6]
        >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
        [10, 9, 7, 6, 5, 4]
    8. saveAsTextFile(path, compressionCodecClass=None)
      1. 概述:将结果保存为文本文件
      2. 样例
        >>> tempFile3 = NamedTemporaryFile(delete=True)
        >>> tempFile3.close()
        >>> codec = "org.apache.hadoop.io.compress.GzipCodec"
        >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
        >>> from fileinput import input, hook_compressed
        >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
        >>> b''.join(result).decode('utf-8')
        u'bar\nfoo\n'
    9. saveAsSequenceFile
      1. 概述:将 数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由 key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如 Int,Double,String等等)
    10. countByKey
      1. 概述:返回每个key在map类型的RDD中出现的次数,返回的结果是一个map
      2. 样例
        >>>sc.parallelize([("a", 1), ("b", 1), ("a", 2)]).countByKey().items()
        [('a', 2), ('b', 1)]

    11. stat
      1. 概述:返回数字列表RDD的统计信息,例如最大、最小值,平均值等信息
      2. 样例
        >>> result=sc.parallelize(range(10)).sample(False,0.5,37);
        >>> result.collect()
        [1, 2, 3, 7, 9]
        >>> result.stats()
        (count: 5, mean: 4.4, stdev: 3.07245829915, max: 9.0, min: 1.0)

    12. foreach
      1. 概述:在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互
      2. 样例
        >>> def f(x): print(x)
        >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
  4. 参考:
       Python的Spark RDD接口:
  1. RDD是什么 
    RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。 

  2. 为什么会产生RDD
    1. 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法
    2. RDD 的具体描述RDD(弹性数据集)是Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方 式,进行各种并行操作。可以将RDD理解为一个具有容错机制的特殊集合,它提供了一种只读、只能有已存在的RDD变换而来的共享内存,然后将所有数据都加 载到内存中,方便进行多次重用。a.他是分布式的,可以分布在多台机器上,进行计算。b.他是弹性的,计算过程中内错不够时它会和磁盘进行数据交换。c. 这些限制可以极大的降低自动容错开销d.实质是一种更为通用的迭代并行计算框架,用户可以显示的控制计算的中间结果,然后将其自由运用于之后的计算。
    3. RDD 的容错机制实现分布式数据集容错方法有两种:数据检查点和记录更新RDD采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换, 即只记录单个块上执行的单个操作,然后创建某个RDD的变换序列(血统)存储下来;变换序列指,每个RDD都包含了他是如何由其他RDD变换过来的以及如 何重建某一块数据的信息。因此RDD的容错机制又称“血统”容错。 要实现这种“血统”容错机制,最大的难题就是如何表达父RDD和子RDD之间的依赖关系。实际上依赖关系可以分两种,窄依赖和宽依赖:窄依赖:子RDD中 的每个数据块只依赖于父RDD中对应的有限个固定的数据块;宽依赖:子RDD中的一个数据块可以依赖于父RDD中的所有数据块。例如:map变换,子 RDD中的数据块只依赖于父RDD中对应的一个数据块;groupByKey变换,子RDD中的数据块会依赖于多有父RDD中的数据块,因为一个key可 能错在于父RDD的任何一个数据块中 将依赖关系分类的两个特性:第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所 有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。第二,数据丢失时,对于窄依赖只需要重新计算丢失的那 一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查 点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。
    4. RDD 内部的设计每个RDD都需要包含以下四个部分:a.源数据分割后的数据块,源代码中的splits变量b.关于“血统”的信息,源码中的 dependencies变量c.一个计算函数(该RDD如何通过父RDD计算得到),源码中的iterator(split)和compute函数d. 一些关于如何分块和数据存放位置的元信息,如源码中的partitioner和preferredLocations例如:a.一个从分布式文件系统中的 文件得到的RDD具有的数据块通过切分各个文件得到的,它是没有父RDD的,它的计算函数知识读取文件的每一行并作为一个元素返回给RDD;b.对与一个 通过map函数得到的RDD,它会具有和父RDD相同的数据块,它的计算函数式对每个父RDD中的元素所执行的一个函数  

  3. RDD在Spark中的地位及作用 
    1. 为什么会有Spark?因为传统的并行计算模型无法有效的解决迭代计算(iterative)和交互式计算(interactive);而Spark的使命便是解决这两个问题,这也是他存在的价值和理由。
    2. Spark如何解决迭代计算?其主要实现思想就是RDD,把所有计算的数据保存在分布式的内存中。迭代计算通常情况下都是对同一个数据集做反复的迭代计算,数据在内存中将大大提升IO操作。这也是Spark涉及的核心:内存计算。 
    3. Spark如何实现交互式计算?因为Spark是用scala语言实现的,Spark和scala能够紧密的集成,所以Spark可以完美的运用scala的解释器,使得其中的scala可以向操作本地集合对象一样轻松操作分布式数据集。  
    4. Spark和RDD的关系?可以理解为:RDD是一种具有容错性基于内存的集群计算抽象方法,Spark则是这个抽象方法的实现。  

  4. 如何操作RDD? 
    1. 如何获取RDD?
      1. 从共享的文件系统获取,(如:HDFS)
      2. 通过已存在的RDD转换
      3. 将已存在scala集合(只要是Seq对象)并行化 ,通过调用SparkContext的parallelize方法实现
      4. 改变现有RDD的持久性,RDD是懒散,短暂的。(RDD的固化:cache缓存至内错;save保存到分布式文件系统) 
    2. 操作RDD的两个动作
      1. Actions:对数据集计算后返回一个数值value给驱动程序;例如:Reduce将数据集的所有元素用某个函数聚合后,将最终结果返回给程序。
        reduce(func) 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
        collect() 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
        count() 返回数据集的元素个数
        take(n) 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
        first() 返回数据集的第一个元素(类似于take(1)
        saveAsTextFile(path) 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本
        saveAsSequenceFile(path) 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系 统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或 隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
        foreach(func) 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

      2. Transformation:根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:Map将数据的每个元素经过某个函数计算后,返回一个姓的分布式数据集。  
          
        map(func)
          
        返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
        filter(func)
        返回一个新的数据集,由经过func函数后返回值为true的原元素组成
        flatMap(func)
        类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
        flatMap(func)
        类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
        sample(withReplacement,  frac, seed)
        根据给定的随机种子seed,随机抽样出数量为frac的数据
        union(otherDataset)
        返回一个新的数据集,由原数据集和参数联合而成
        groupByKey([numTasks])
        在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
        reduceByKey(func,  [numTasks])
        在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
        join(otherDataset,  [numTasks])
        在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
        groupWith(otherDataset,  [numTasks])
        在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup
        cartesian(otherDataset)

          笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。
        flatMap(func)
        类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

  1. 两种Task类型: shuffleMapTask 和 ResultTask ,被执行的 task 多数都是 shuffleMapTask 
    1. ResultTask ( FinalStage 所对应的任务) , 返回给 driver 的是运算结果本身
      1. – 结果足够小,则直接放在 DirectTaskResult 对象内
      2. – 超过特定尺寸(默认约 10MB )则在 Executor 端会将 DirectTaskResult 先序列化,再把序列化的结果作为一个 Block 存放在 BlockManager 里,而后将 BlockManager 返回的 BlockID放在 IndirectTaskResult 对象中返回给 driver
    2. ShuffleMapTask ,返回给 DAGScheduler 的是一个 MapStatus 对象, MapStatus 对象管理了 ShuffleMapTask 的运算输出结果在 ShuffleBlockManager 里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个 Stage 的任务的获取输入数据的依据
      1. shuffle 的结果 patition 数目由 ShuffleDependency 中的 Partitioner 对象来决定
      2. Spark 内核将提供一个可拔插的 shuffle 接口



阅读(3556) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~