Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1898892
  • 博文数量: 211
  • 博客积分: 464
  • 博客等级: 下士
  • 技术积分: 3794
  • 用 户 组: 普通用户
  • 注册时间: 2011-01-24 18:25
个人简介

阿弥陀佛

文章分类

全部博文(211)

文章存档

2020年(2)

2019年(3)

2018年(5)

2017年(6)

2016年(10)

2015年(9)

2014年(73)

2013年(90)

2012年(13)

分类: 大数据

2020-11-01 16:45:16

那什么是spark 数据不均衡的问题呢?

当某一个column 的value 出现特别多次,比如1000次以上。然后table1 与table2 join 的时候,就会导致某个分区的task 执行时间特别长。详见下图,下图就是在spark join操作的时候遇到的数据分布不均衡,导致的某个task 执行时间过长。
比如,table1: 

itemid userid
123 abc
123 abce
123 acd
123 acd

table2:
itemid price
123 100

table1 和 table2 inner join by itemid 的时候,所有的数据会进入一个partition。有可能导致这个partition超级大。最后这个paritition成为了spark 计算的瓶颈。核心原因是itemid作为partition破坏了数据的均衡。该怎么解决呢?

 解决spark 中数据不均衡的问题,其实可以映射到生活中的问题,借鉴生活中解决问题的思路。
可以理解为老师给孩子批作业,老师只有一个,但是却要批全班人的作业,如果班级人数是1000人,那根本就批不过来,找几个老师同时批作业就行了,让孩子们各自找其中一个老师批作业就行了。
然后再把各个老师批的作业汇总就可以了(这也是体现了分治的思想)。

再回到spark,我们可以给key 加 salt。步骤如下:
1) 给小表table2中的key 多复制n份(多找几个老师来批作业)
2)   给大表table1中的key 随机加一个suffix, (分配老师)
3) join by 新的key


itemid userid
123_1 abc
123_2 abce
123_3 acd
123_1 acd

table2:
itemid price
123_1 100

123_2 100
123_3 100

我常用的spark 代码可以参考这样:


点击(此处)折叠或打开

  1. def saltedJoin(df: DataFrame, buildDf: DataFrame, joinExpression: Column, joinType: String, salt: Int): DataFrame = {
  2. import org.apache.spark.sql.functions._
  3.  val tmpDf = buildDf.withColumn(“slt_range”, array(Range(0, salt).toList.map(lit): _*))
  4.  
  5.  val tableDf = tmpDf.withColumn(“slt_ratio_s”, explode(tmpDf(“slt_range”))).drop(“slt_range”)
  6. val streamDf = df.withColumn(“slt_ratio”, monotonically_increasing_id % salt)
  7.  
  8.  val saltedExpr = streamDf(“slt_ratio”) === tableDf(“slt_ratio_s”)
  9. && joinExpression
  10. streamDf.join(tableDf, saltedExpr, joinType).drop(“slt_ratio_s”).drop(“slt_ratio”)
  11. }
参考文献:
https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8
阅读(2301) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~