其他分享
首页 > 其他分享> > spark 笔记4 sparkRDD

spark 笔记4 sparkRDD

作者:互联网

目录

spark RDD

关于sparkRDD基本概念

学习对于RDD的基本操作

主从节点的启动

首先就像第一次学在笔记1里面记录的一样,启动spark主节点的服务,然后在localhost:8080查看spark主节点的参数并且记录下来

然后就可以使用这个主节点的参数,启动这个主节点的从节点

spark的初始化

在开发程序时,spark的初始化操作首先就是创建一个SparkConf对象,这个对象包含应用的一些信息,然后创建SparkContext,SparkContext可以让 Spark 知道如何访问集群
那么,代码是这个样子的
就是在指定app名和主节点所在的spark集群之后,使用这个conf对象指定给一个sparkcontext方法来创建一个sparkcontext

val conf = new SparkConf().setAppName("Shiyanlou").setMaster("spark://7576cf9c687e:7077")

new SparkContext(conf)
// 在每个JVM中,只有一个SparkContext能够被激活。若需要创建新的SparkContext,你必须调用sc.stop()来关闭当前已激活的那个

在spark-shell里做spark的初始化并不需要新建这两个对象,因为 Spark Shell 相当于一个 Spark 应用,启动时已经用过spark-shell --master spark://7576cf9c687e:7077来指定集群信息,所以 Spark Shell 启动后已经具备了一个 SparkContext 对象sc

RDD创建

Spark 上开发的应用程序都是由一个driver programe构成,这个所谓的驱动程序在 Spark 集群通过跑main函数来执行各种并行操作。集群上的所有节点进行并行计算需要共同访问一个分区元素的集合,这就是 RDD(RDD:resilient distributed dataset)弹性分布式数据集。RDD 可以存储在内存或磁盘中,具有一定的容错性,可以在节点宕机重启后恢复。
RDD 可以从 HDFS 中的文件创建,也可以从 Scala 或 Python 集合中创建。
创建 RDD 有两种方式:一种是调用 SparkContext 的 parallelize() 方法将数据并行化生成 RDD,另外一种方法是从外部存储中的数据集生成 RDD(如 Linux 文件系统,HDFS,HBase 等)

调用parallelize()方法并行化生成RDD

如果要对已有的集合进行并行化,我们可以先创建一个列表,然后调用上面提到的sc的parallelize方法将该集合并行化。集合中的元素会被复制到一个 RDD 中。并行集合创建后可以进行 RDD 的分布式操作,一个很重要的参数是切片数(slices),表示数据集被切分的份数,Spark 会为每个切片运行一个任务并能够根据集群状况动态调整切片数量。使用parallelize方法的参数可以手动设置切片数。
这种并行集合生成RDD的办法会把所有的数据都放在内存里,所以除了开发原型和测试以外,一般不采用这种方式

就这样,把新建的数据列表传给parallelize这个函数,这个函数就会在这个数据集合的基础上为我们创建RDD,并且RDD的切片数同样可以通过parallelize函数来指定

使用外部存储中的数据集生成RDD

在实际开发中最常用的是从外部存储系统中读取数据创建 RDD。Spark 可以从任何 Hadoop 支持的存储上创建 RDD,比如本地的文件系统、HDFS、Cassandra、HBase 等。同时 Spark 还支持文本文件、SequenceFiles 等

注意事项

// 从 protocols 文件中创建 RDD
val distFile = sc.textFile("/etc/protocols")

// RDD 操作:计算所有行的长度之和,最后结果为 2868
distFile.map(s => s.length).reduce((a,b) => a + b)

RDD的这个操作也是做的一个mapreduce,用map来把每一行映射成每一行的长度,reduce做的是把数据集合里面的元素相加

正式的、RDD的基础操作

对于RDD的基础操作有两种:transformations和actions

// 从 protocols 文件创建RDD
val distFile = sc.textFile("/etc/protocols")

// Map 操作,每行的长度
val lineLengths = distFile.map(s => s.length)

// Reduce 操作,获得所有行长度的和,即文件总长度,这里才会执行 map 运算
val totalLength = lineLengths.reduce((a, b) => a + b)

// 可以将转换后的 RDD 保存到集群内存中
lineLengths.persist()

上面这个例子里面,map操作敲进去的时候,并没有被执行,在敲完reduce求和的时候,map运算才被执行的,也就是说想要的到最后map出来的结果要执行完reduce才行
persist方法是把map完的那个RDD保留到内存里

总结

基本编程步骤总结

所以,课程里面是这样总结的:

RDD 基本编程步骤可以总结为:

  1. 读取内、外部数据集创建 RDD。
  2. 对于 RDD 进行转化生成新的 RDD ,比如 map()、filter() 等。
  3. 对需要重用的数据执行 persist()/cache() 进行缓存。
  4. 执行行动操作获得最终结果,比如 count() 和 first()等。

没有做的实践操作

导入并使用jar包

还没有一个具体的应用场景,让我指定某个具体的jar包

集成编译环境下的配置操作


TO BE CONTINUED

标签:map,创建,笔记,RDD,sparkRDD,操作,spark,数据
来源: https://www.cnblogs.com/ltl0501/p/12109883.html