用户登录
用户注册

分享至

spark数据倾斜

  • 作者: 每日看段子
  • 来源: 51数据库
  • 2020-09-26
触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在Web UI上看就可以,然后查看运行耗时的task,查看数据是否倾斜了!
根据这个task,根据stage划分原理,推算出数据倾斜发生在哪个shuffle类算子上。
查看导致数据倾斜的key的数据分布情况
根据执行操作的不同,可以有很多种查看key分布的方式:
1,如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
2,如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。
比如针对wordCount案例,最后的reduceByKey算子导致了数据倾斜:
val sampledPairs = pairs.sample(false,0.1) //对pairs采样10%
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
数据倾斜的解决办法
方案一:使用Hive ETL预处理数据
适用场景:导致数据倾斜的是Hive表,Hive表中的数据本身很不均匀,业务场景需要频繁使用Spark对Hive表执行某个分析操作。
实现思路:提前将join等操作执行,进行Hive阶段的ETL。将导致数据倾斜的shuffle前置。
优缺点:实现简单,Spark作业性能提升,但是Hive ETL还是会发生数据倾斜,导致Hive ETL的速度很慢。
实践经验:将数据倾斜提前到上游的Hive ETL,每天就执行一次,慢就慢点吧。

方案二:过滤少数导致倾斜的key
适用场景:少数几个key导致数据倾斜,而且对计算本身影响并不大的话。
实现思路:比如Spark SQL中直接用where条件过滤掉这些key,如果是RDD的话,用filter算子过滤掉这些key。如果是动态判断哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。
优缺点:实现简单,效果也好。缺点是一般情况下导致倾斜的key还是很多的,不会是少数。

解决方案三:提高shuffle操作的并行度
适用场景:直接面对数据倾斜的简单解决方案。
实现思路:对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行的shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by,join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数默认值是200,对于很多场景来说有点过小。
优缺点:简单能缓解,缺点是没有根除问题,效果有限。

解决方案四:两阶段聚合(局部聚合+全局聚合)
适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适合这种方案。
实现思路:先局部聚合,给每个key打一个小范围的随机数,比如10以内的随机数,相当于分成10份,一个task分成10个task。聚合聚合后,去掉key上的随机数前缀,再次进行全局聚合操作。

优缺点:大幅度缓解数据倾斜,缺点是仅适用于聚合类的shuffle操作。

解决方案五:将reduce join转为map join



  触发shuffle的常见算子:distinct、groupbykey、reducebykey、aggregatebykey、join、cogroup、repartition等。要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在web ui上看就可以,然后查看运行耗时的task
软件
前端设计
程序设计
Java相关