场景 | 解决方式 | Java工程链接 | Python工程链接 | Scala工程链接 | 参考 |
---|---|---|---|---|---|
利用hive进行预处理(倾斜的key) | - | - | - | - | [1]解决方案一 |
过滤少数导致倾斜的key 分开计算 |
- | - | - | - | [1]解决方案二 |
数据源数据文件不均匀(例:tbl.gz文件) | 改成可切割文件(例如.txt等) | 修改sc.textFile()中的内容即可 | 修改sc.textFile()中的内容即可 | 修改sc.textFile()中的内容即可 | [2]数据倾斜的常见解决方法-1 |
导致shuffle的算子 执行时的并行度不够 |
提升并行度 | 提升并行度配置或Java代码配置 | 提升并行度配置或Python代码配置 | 提升并行度配置或Scala代码配置 | [1]解决方案三 |
数据集可大可小 | 自定义Partitioner 根据数据量不同返回一个灵活的自适应的并行度 |
Java自定义partition数量[3] | Python自定义partition数量 | Scala自定义partition数量 | 参考自[3] |
部分key导致倾斜 | key-salting(给key前面加随机数) | section4-Java工程 | section4-scala工程 | [1]解决方案四 | |
大数据rdd在join时通过集群IO传播, 但是IO带宽有限。所以采用: reduce join->map join |
通过Broadcast传递小RDD 来避免join时通过IO传输大RDD |
完整Java工程 | Spark-shell代码 | [1]解决方案五 | |
两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五” | 将两个RDD的倾斜部分分别盐化、扩容,然后进行join, 两个原始RDD剩余部分各自join, 上述俩个join结果再次整合,得到最终结果 |
博客图解 section6-Java工程 |
scala工程 | [1]解决方案六 | |
①一个RDD盐化, ②一个RDD扩容100倍, ③join后反盐化 |
section7-Java工程 | section7-scala工程 | [1]解决方案七 |
综述如下:
Spark中Data skew(数据倾斜)的常用处理手段(Java+Python+Scala)三种接口完整代码
注:
①Pycharm中的Pyspark连接远程Spark集群,
存在worker(自动采用worker的系统路径python版本,无法修改)与Driver的Python版本不一致问题,
目前暂时无法调试、无法提交(除非各个节点系统自带Python版本一致)。
而local模式不具备实际意义,故这部分姑且放下。
②补全了美团spark数据倾斜方案中的bug、变量错误、少量逻辑错误和代码不完整的问题,并增加Python和Scala两种写法
补充: [4]中提到: join时候, 如果表的数据量低于spark.sql.autoBroadcastJoinThreshold参数值时(默认值为10 MB), spark会自动进行broadcast(隐式的优化方案) 所以上面表格中的broadcast只是一种显式的优化方案
Reference:
[1]Spark性能优化指南——高级篇
[2]Spark如何处理数据倾斜
[3]Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势
[4]工作经验分享:Spark调优【优化后性能提升1200%】