那什么是spark 数据不均衡的问题呢?
当某一个column 的value 出现特别多次,比如1000次以上。然后table1 与table2 join 的时候,就会导致某个分区的task 执行时间特别长。详见下图,下图就是在spark join操作的时候遇到的数据分布不均衡,导致的某个task 执行时间过长。
比如,table1:
itemid
|
userid
|
123
|
abc
|
123
|
abce
|
123
|
acd
|
123
|
acd
|
table2:
table1 和 table2 inner join by itemid 的时候,所有的数据会进入一个partition。有可能导致这个partition超级大。最后这个paritition成为了spark 计算的瓶颈。核心原因是itemid作为partition破坏了数据的均衡。该怎么解决呢?
解决spark 中数据不均衡的问题,其实可以映射到生活中的问题,借鉴生活中解决问题的思路。
可以理解为老师给孩子批作业,老师只有一个,但是却要批全班人的作业,如果班级人数是1000人,那根本就批不过来,找几个老师同时批作业就行了,让孩子们各自找其中一个老师批作业就行了。
然后再把各个老师批的作业汇总就可以了(这也是体现了分治的思想)。
再回到spark,我们可以给key 加 salt。步骤如下:
1) 给小表table2中的key 多复制n份(多找几个老师来批作业)
2) 给大表table1中的key 随机加一个suffix, (分配老师)
3) join by 新的key
itemid
|
userid
|
123_1
|
abc
|
123_2
|
abce
|
123_3
|
acd
|
123_1
|
acd
|
table2:
itemid
|
price
|
123_1
|
100
|
123_2
|
100
|
123_3
|
100
|
我常用的spark 代码可以参考这样:
-
def saltedJoin(df: DataFrame, buildDf: DataFrame, joinExpression: Column, joinType: String, salt: Int): DataFrame = {
-
import org.apache.spark.sql.functions._
-
val tmpDf = buildDf.withColumn(“slt_range”, array(Range(0, salt).toList.map(lit): _*))
-
-
val tableDf = tmpDf.withColumn(“slt_ratio_s”, explode(tmpDf(“slt_range”))).drop(“slt_range”)
-
val streamDf = df.withColumn(“slt_ratio”, monotonically_increasing_id % salt)
-
-
val saltedExpr = streamDf(“slt_ratio”) === tableDf(“slt_ratio_s”)
-
&& joinExpression
-
streamDf.join(tableDf, saltedExpr, joinType).drop(“slt_ratio_s”).drop(“slt_ratio”)
-
}
参考文献:
https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8
阅读(2301) | 评论(0) | 转发(0) |