数据库
首页 > 数据库> > 记一次--------sparkSQL程序local模式运行不起来,增加参数配置spark.locality.wait

记一次--------sparkSQL程序local模式运行不起来,增加参数配置spark.locality.wait

作者:互联网

问题:      跑本地模式 一直卡在下图最下面日志部分30分钟不动 查看运行日志一直卡在 箭头处不动,没有任何报错。 因为处理逻辑只是简单的sparksql两个表left join,  union, having等简单的函数操作。 测试环境 数据仅有3w条。   虽然将程序打包到集群,但还是跑的local模式, 下面是脚本配置  
#!/bin/bash
#jdk
export JAVA_HOME=/usr/java/jdk1.8.0_162
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
#hadoop
export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
 
 
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native
 
 
#spark
#export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0
#export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
 
 
realtime_queue=root
my_job_name="userhierarchy"
main_class="com.df.App"
 
 
/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[2] \
--name ${my_job_name} \
--class ${main_class} \
--driver-memory 2g     \
--executor-memory 2g     \
--executor-cores 8   \
--queue ${realtime_queue} \
/opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar
 
 
 
 
#--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8787" \
 
 

 

  1.首先将数据量减少到5000 可以正常执行,测试1w条 又不行。 考虑到会不会因为产生笛卡尔积。 因为5000条产生笛卡尔积是5000*5000  ,而3w * 3w 的数据量也还是不小。 故对所有使用join的SQL都进行排查, 发现没有笛卡尔积。 然后尝试使用IN 来代替 left join 。  还是不行。 然后通过使用where代替having。   因为 having在SQL中的执行在最后,需要对表进行分组 排序完之后在做having的条件筛选,而 where是先进行条件筛选之后在对剩余数据进行处理。效率会比having要高。  但是程序还是卡在上图位置不动。   2.转变思路对 参数进行优化,查阅资料发现一个参数 spark.locality.wait     数据本地化等待时间 为什么会有这个参数呢?     spark在driver上 ,对application的每个state的task分配之前,会先计算出每个task要计算的是哪个分片数据(RDD上的某个partition),spark分配的task的算法,优先希望每个task签好分配到它所要计算的数据的节点上,这样就尽可能的避免了网络间数据传输。     但实际上,有时候 ,task并没有分配到它所要计算的数据的节点上,因为有可能那个节点的计算资源和计算能力满了,因为task处理数据是需要计算资源的,所以通常来说spark会等待一段时间,看是否能将task分配到它要处理数据所在节点上,这个等待时长默为3s(3s不是绝对的,针对不同的本地化级别可以设置不同等待时长)。如果超过等待时长,无法计算等待,就会选择一个性能比较差的本地化级别,比如:task分配到距离它所要处理数据节点比较近的一个节点上,然后传输数据进行计算。     而对于我们来说最好是,task正好分配到它要处理数据所在节点上,这样直接从本地executor对应的blockManager中获取数据,纯内存传出数据,或带有部分磁盘IO。   本地化级别:     本地化就是指task被分配要处理部分数据,task和它要处理的数据可能会在不同的节点位置,根据这种位置关系又5种不同的本地化级别:   1.PROCESS_LOCAL:进程本地化,计算数据的task由某个executor执行,数据也就在这个executor对应的BlockManager。这种本地化级别 性能最好   2.NODE_LOCAL:节点本地化。第一种情况,数据作为HDFS block数据块就在节点上, 而task节点是在某个executor上运行;第二种情况,task和它要处理的数据,在同一节点的不同executor上,数据需要在进程之间传输   3.NO_PREF: 对于task来说,数据在哪里获取都一样,无好坏之分   4.RACK_LOCAL:机架本地化,task和它要处理的数据在同一机架的不同节点上, 数据需要通过网络在节点之间传输   5.ANY:task和它要处理的数据可能在集群的任何地方,而且不在同一机架上(RACK),数据要跨机架传输,性能最差。   何时调节该参数呢? 在本地模式下跑程序观察spark作业的日志,查看starting task........ PROCESS_LOCAL NODE_LOCAL 观察大多数task的数据本地化级别,如果大多数是NODE_LOCAL / ANY 那么可以调节,观察时间是否有缩短,反复几次 寻找最优的时间。    添加配置后的脚本
#!/bin/bash
#jdk
export JAVA_HOME=/usr/java/jdk1.8.0_162
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
#hadoop
export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native

#spark
#export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0
#export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

realtime_queue=root
my_job_name="userhierarchy"
main_class="com.df.App"

 /opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[3] \
--name ${my_job_name} \
--class ${main_class} \
--driver-memory 3g     \
--executor-memory 3g     \
--executor-cores 5   \
--conf  spark.executor.memoryOverhead=2048M \
--conf  spark.locality.wait=10 \
--queue ${realtime_queue} \
/opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar order_info_test

 

                     

标签:bin,task,locality,export,sparkSQL,PATH,HOME,spark
来源: https://www.cnblogs.com/yzqyxq/p/12571239.html