其他分享
首页 > 其他分享> > 大规模数据分析统一引擎Spark最新版本3.3.0入门实战

大规模数据分析统一引擎Spark最新版本3.3.0入门实战

作者:互联网

@

目录

概述

定义

Spark 官网 https://spark.apache.org/

Spark 官网最新文档文档 https://spark.apache.org/docs/latest/

Spark GitHub源码地址 https://github.com/search?q=spark

Apache Spark™是一个开源的、分布式、多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习,用于大规模数据分析的统一引擎。目前最新版本为3.3.0

Hadoop与Spark的关系与区别

image-20220911120808588

image-20220911120841660

image-20220911121005280

特点与关键特性

Spark官网对其特点高度概括的四个单词分为为:Simple-简单Fast-快Scalable-可扩展Unified-统一

Spark关键特性包括:

组件

image-20220911122510227

集群概述

Spark应用程序在集群中作为独立的进程集运行,由主程序(称为驱动程序)中的SparkContext对象协调。

image-20220911141111055

集群术语

部署

概述

Spark作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国内工作中主流的环境为Yarn,不过逐渐容器式环境也慢慢流行起来。

容器式环境:发现集群规模不足够,会自动化生成所需要的环境,不需要多余集群机器时,也会自动删除。

Spark运行环境 = Java环境(JVM) + 集群环境(Yarn) + Spark环境(lib)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WJRyiUk1-1662907653703)(http://www.itxiaoshen.com:3001/assets/1662907244159WcYf6J10.png)]

Spark既可以单独运行,也可以通过多个现有的集群管理器运行。目前它提供了几种部署选项

环境准备

由于前面的文章已经搭建过Hadoop,环境预准备条件是类似,比如至少3台机器的互通互联、免密登录、时间同步、JDK如1.8版本安装,这里就不再说明了。

Local模式

所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境

# 下载最新版本3.3.0
wget --no-check-certificate https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
# 解压
tar -xvf spark-3.3.0-bin-hadoop3.tgz
# 拷贝一个部署spark-local目录
spark-3.3.0-bin-hadoop3 spark-local

image-20220911155348006

spark文件目录说明

# 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.3.0.jar \
10

参数说明:

Standalone部署

Standalone模式

Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式,Spark的Standalone模式体现了经典的master-slave模式。

# 拷贝一个部署spark-standalone目录
cp -r spark-3.3.0-bin-hadoop3 spark-standalone
# 进入目录
cd spark-standalone/
cd conf
# 准备workers配置文件
mv workers.template workers
# 修改workers内容为
vi workers
hadoop1
hadoop2
hadoop3
# 准备spark-env.sh配置文件
mv spark-env.sh.template spark-env.sh
# spark-env.sh添加如下内容
vi spark-env.s
export JAVA_HOME=/home/commons/jdk8
SPARK_MASTER_HOST=hadoop1
SPARK_MASTER_PORT=7077
# 分发到其他两台上
scp -r /home/commons/spark-standalone hadoop2:/home/commons/
scp -r /home/commons/spark-standalone hadoop3:/home/commons/
# 进入根目录下sbin执行目录和启动
cd sbin/
./start-all.sh

image-20220911142538000

当前机器有Master和Worker进程,而另外其他两台上有worker进程

image-20220911142721534

访问WebUi页面:http://hadoop1:8080/ ,如果8080端口有其他服务使用,可以访问8081端口也能正常访问到当前这个Master的页面(默认8081是Worker的)

image-20220911142809375

配置历史服务

由于spark-shell 停止掉后,集群监控页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。

# 先停止前面启动的集群
./stop-all.sh
# 准备spark-defaults.conf
cd ../conf
mv spark-defaults.conf.template spark-defaults.conf
# 修改spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled          true
spark.eventLog.dir              hdfs://myns:8020/sparkhis
# 需要启动Hadoop集群,HDFS上的目录需要提前存在
hadoop fs -mkdir /sparkhis
# 修改spark-env.sh文件,添加如下配置:
vi spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.fs.logDirectory=hdfs://myns:8020/sparkhis
-Dspark.history.retainedApplications=30"
# 参数1含义:WEBUI访问的端口号为18080
# 参数2含义:指定历史服务器日志存储路径(读)
# 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
由于hadoop是HA模式因此配置为hdfs-site.xml下的dfs.nameservices的value值
    <property>
        <name>dfs.nameservices</name>
        <value>myns</value> <!--core-site.xml的fs.defaultFS使用该属性值-->
    </property>
# 分发配置到另外两台上
scp spark-defaults.conf spark-env.sh hadoop2:/home/commons/spark-standalone/conf/
scp spark-defaults.conf spark-env.sh hadoop3:/home/commons/spark-standalone/conf/
# 启动集群
./start-all.sh
# 启动历史服务
./start-history-server.sh

访问历史服务的WebUI地址,http://hadoop1:18080/

image-20220911150028765

高可用(HA)

所谓的高可用是因为当前集群中的 Master 节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个 Master 节点,一旦处于活动状态的 Master发生故障时,由备用 Master 提供服务,保证作业可以继续执行。这里的高可用一般采用Zookeeper 设置。这里使用前面搭建Zookeeper 。

# 停止集群
./stop-all.sh
# 停止历史服务
./stop-history-server.sh
# 修改Spark中的 spark-env.sh 文件,修改如下配置
# 注释如下内容
#SPARK_MASTER_HOST=hadoop1
#SPARK_MASTER_PORT=7077
# 添加如下内容
export SPARK_DAEMON_JAVA_OPTS="  
-Dspark.deploy.recoveryMode=ZOOKEEPER  
-Dspark.deploy.zookeeper.url=zk1,zk2,zk3 
-Dspark.deploy.zookeeper.dir=/spark" 
# 分发配置到另外两台上
scp spark-env.sh hadoop2:/home/commons/spark-standalone/conf/
scp spark-env.sh hadoop3:/home/commons/spark-standalone/conf/
# 启动集群
./start-all.sh
# 在另外服务器如hadoop2上启动master
./start-master.sh

hadoop1的master为ALIVE状态

image-20220911153100076

hadoop2的master为STANDBY状态

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NVmFWDEw-1662907653726)(http://www.itxiaoshen.com:3001/assets/1662907298952BdHwc0Bh.png)]

提交应用到高可用集群,注意提交master要填写多个master地址

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop1:7077,hadoop2:7077 \
./examples/jars/spark-examples_2.12-3.3.0.jar \
10
# 停止mALIVE状态hadoop1的master
./stop-master.sh

hadoop2的master由原来的STANDBY状态转变为ALIVE状态,实现高可用的切换

image-20220911153755576

提交流程

作业提交原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GqM7w3Qz-1662907653730)(http://www.itxiaoshen.com:3001/assets/1662907304666dPiztcT6.png)]

Standalone-client 提交任务方式

# --deploy-mode client 可以省略,默认为client
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop1:7077,hadoop2:7077 \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.3.0.jar \
10

# 其他参数提交应用
bin/spark-submit \
--master spark://hadoop1:7077 \
--class org.apache.spark.examples.SparkPi \
--driver-memory 500m \
--driver-cores 1 \
--executor-memory 800m \
--executor-cores 1 \
./examples/jars/spark-examples_2.12-3.3.0.jar \
10

image-20220911213741376

参数说明:

查看WebUI后可以看到刚刚提交的任务app-20220911213322-0000信息

image-20220911213447273

执行任务时,会产生多个Java进程(用于计算)执行完成就会释放掉。其中Master和Worker是资源,默认采用服务器集群节点的总核数,每个节点内存1024M。

image-20220911190505269

当在客户端提交多个Spark application时,每个application都会启动一个Driver。lient 模式适用于测试调试程序。Driver 进程是在客户端启动的,这里的客 户端就是指提交应用程序的当前节点。在 Driver 端可以看到 task 执行的情 况。生产环境下不能使用 client 模式,是因为:假设要提交 100 个,application 到集群运行,Driver 每次都会在 client 端启动,那么就会导致 客户端 100 次网卡流量暴增的问题。client 模式适用于程序测试,不适用 于生产环境,在客户端可以看到 task 的执行和结果

Standalone-cluster 提交任务方式

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop1:7077,hadoop2:7077 \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.3.0.jar \
10

image-20220911190750915

Driver 进程是在集群某一台 Worker 上启动的,在客户端是无法查看 task 的执行情况的。假设要提交 100 个 application 到集群运行,每次 Driver 会 随机在集群中某一台 Worker 上启动,那么这 100 次网卡流量暴增的问题 就散布在集群上。

Yarn部署

独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来需要学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。

Yarn Client模式

image-20220911191300314

# 重新拷贝一个目录
cp -r spark-3.3.0-bin-hadoop3 spark-yarn
# 修改hadoop配置文件/home/commons/hadoop/etc/hadoop/yarn-site.xml,
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
     <name>yarn.nodemanager.pmem-check-enabled</name>
     <value>false</value>
</property>

<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
     <name>yarn.nodemanager.vmem-check-enabled</name>
     <value>false</value>
</property>

#  yarn-site.xml并分发到其他两台机器
scp /home/commons/hadoop/etc/hadoop/yarn-site.xml hadoop2:/home/commons/hadoop/etc/hadoop/
scp /home/commons/hadoop/etc/hadoop/yarn-site.xml hadoop3:/home/commons/hadoop/etc/hadoop/

# 重启Hadoop
cd /home/commons/hadoop/sbin
./stop-all.sh
./start-all.sh
# 修改conf目录三个配置文件,跟前面的类似稍微修改
mv workers.template workers
# 修改workers内容为
vi workers
hadoop1
hadoop2
hadoop3
# 准备spark-env.sh配置文件
mv spark-env.sh.template spark-env.sh
# 增加下面配置
vi spark-env.sh
export JAVA_HOME=/home/commons/jdk8
HADOOP_CONF_DIR=/home/commons/hadoop/etc/hadoop
YARN_CONF_DIR=/home/commons/hadoop/etc/hadoop
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://myns:8020/sparkhis
-Dspark.history.retainedApplications=30"
# 修改spark-defaults.conf.template文件名为spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
# 修改spark-default.conf文件,配置日志存储路径
spark.eventLog.enabled  true
spark.eventLog.dir  hdfs://myns:8020/sparkhis
spark.yarn.historyServer.address=hadoop1:18080
spark.history.ui.port=18080
# 启动历史服务
sbin/start-history-server.sh 

提交yarn client任务

bin/spark-submit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
--driver-memory 600m \
--driver-cores 1 \
--executor-memory 800m \
--executor-cores 2 \
./examples/jars/spark-examples_2.12-3.3.0.jar \
100

image-20220911221723922

查看yarn的页面:http://hadoop1:8088/cluster 有刚才提交的应用application_1662905714420_0001为状态成功且已完成

image-20220911221829558

Yarn Cluster模式

image-20220911213657384

bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--driver-memory 600m \
--driver-cores 1 \
--executor-memory 800m \
--executor-cores 2 \
./examples/jars/spark-examples_2.12-3.3.0.jar \
100

image-20220911222210998

查看yarn的页面:http://hadoop1:8088/cluster 有刚才提交的应用application_1662905714420_0002为状态成功且已完成

Spark-Shell

在Spark Shell中,已经在名为sc的变量中为您创建了一个特殊的SparkContext,如果您自己创建SparkContext会不生效。您可以使用--master参数设置SparkContext连接到哪个主节点,并且可以通过--jars参数来设置添加到CLASSPATH的JAR包,多个JAR包时使用逗号(,)分隔。更多参数信息,您可以通过命令./bin/spark-shell --help获取。

bin/spark-shell
# 启动成功后,可以输入网址进行Web UI监控页面访问
# http://虚拟机地址:4040
# 在解压缩文件夹下的data目录中,添加word.txt文件。在命令行工具中执行如下代码指令(和IDEA中代码简化版一致)
sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
# 退出本地模式,尽量用命令退出
:quit
bin/spark-shell \
--master spark://hadoop1:7077
bin/spark-shell \
--master yarn

image-20220911223439129

**本人博客网站 **IT小神 www.itxiaoshen.com

标签:数据分析,--,sh,集群,3.3,Spark,spark,examples
来源: https://www.cnblogs.com/itxiaoshen/p/16685085.html