spark 笔记4 sparkRDD
作者:互联网
目录
spark RDD
关于sparkRDD基本概念
- RDD:弹性分布式数据集,是spark对数据的核心抽象,也是spark数据处理的基本单位
spark处理数据之前会首先把数据转换成RDD然后在RDD上对数据进行操作 - spark对RDD的数据操作,其本身有两种对于RDD的算子:转换(transform)和行动(action),这两个分类下分别由各自对应的数个api函数
对于数据,spark的操作过程是:创建RDD、对RDD进行转化操作(transform)、用行动操作来求值(action) - 该数据操作流程的便捷性:spark底层节点的协商、容错、通信的细节,这样在上层对于数据的操作就变得更容易
学习对于RDD的基本操作
主从节点的启动
首先就像第一次学在笔记1里面记录的一样,启动spark主节点的服务,然后在localhost:8080查看spark主节点的参数并且记录下来
然后就可以使用这个主节点的参数,启动这个主节点的从节点
spark的初始化
在开发程序时,spark的初始化操作首先就是创建一个SparkConf对象,这个对象包含应用的一些信息,然后创建SparkContext,SparkContext可以让 Spark 知道如何访问集群
那么,代码是这个样子的
就是在指定app名和主节点所在的spark集群之后,使用这个conf对象指定给一个sparkcontext方法来创建一个sparkcontext
val conf = new SparkConf().setAppName("Shiyanlou").setMaster("spark://7576cf9c687e:7077")
new SparkContext(conf)
// 在每个JVM中,只有一个SparkContext能够被激活。若需要创建新的SparkContext,你必须调用sc.stop()来关闭当前已激活的那个
在spark-shell里做spark的初始化并不需要新建这两个对象,因为 Spark Shell 相当于一个 Spark 应用,启动时已经用过spark-shell --master spark://7576cf9c687e:7077来指定集群信息,所以 Spark Shell 启动后已经具备了一个 SparkContext 对象sc
RDD创建
Spark 上开发的应用程序都是由一个driver programe构成,这个所谓的驱动程序在 Spark 集群通过跑main函数来执行各种并行操作。集群上的所有节点进行并行计算需要共同访问一个分区元素的集合,这就是 RDD(RDD:resilient distributed dataset)弹性分布式数据集。RDD 可以存储在内存或磁盘中,具有一定的容错性,可以在节点宕机重启后恢复。
RDD 可以从 HDFS 中的文件创建,也可以从 Scala 或 Python 集合中创建。
创建 RDD 有两种方式:一种是调用 SparkContext 的 parallelize() 方法将数据并行化生成 RDD,另外一种方法是从外部存储中的数据集生成 RDD(如 Linux 文件系统,HDFS,HBase 等)
调用parallelize()方法并行化生成RDD
如果要对已有的集合进行并行化,我们可以先创建一个列表,然后调用上面提到的sc的parallelize方法将该集合并行化。集合中的元素会被复制到一个 RDD 中。并行集合创建后可以进行 RDD 的分布式操作,一个很重要的参数是切片数(slices),表示数据集被切分的份数,Spark 会为每个切片运行一个任务并能够根据集群状况动态调整切片数量。使用parallelize方法的参数可以手动设置切片数。
这种并行集合生成RDD的办法会把所有的数据都放在内存里,所以除了开发原型和测试以外,一般不采用这种方式
就这样,把新建的数据列表传给parallelize这个函数,这个函数就会在这个数据集合的基础上为我们创建RDD,并且RDD的切片数同样可以通过parallelize函数来指定
使用外部存储中的数据集生成RDD
在实际开发中最常用的是从外部存储系统中读取数据创建 RDD。Spark 可以从任何 Hadoop 支持的存储上创建 RDD,比如本地的文件系统、HDFS、Cassandra、HBase 等。同时 Spark 还支持文本文件、SequenceFiles 等
注意事项
- 使用不同的 SparkContext 的函数接口可以在不同的外部存储场景下创建RDD。然后使用 testfile() 方法会返回一个 RDD 对象,然后就可以调用 RDD 中定义的方法
- 如果使用本地存储上的文本文件,这个文件必须可以被所有节点 worker 访问
- 支持目录,压缩文件及通配符
- 同上一节的并行集合一样,textFile 函数还有另外一个接口控制切片数目
// 从 protocols 文件中创建 RDD
val distFile = sc.textFile("/etc/protocols")
// RDD 操作:计算所有行的长度之和,最后结果为 2868
distFile.map(s => s.length).reduce((a,b) => a + b)
RDD的这个操作也是做的一个mapreduce,用map来把每一行映射成每一行的长度,reduce做的是把数据集合里面的元素相加
正式的、RDD的基础操作
对于RDD的基础操作有两种:transformations和actions
- 转换(transformations):将已存在的数据集 RDD 转换成新的数据集 RDD,例如 map。转换是惰性的,不会立刻计算结果,仅仅记录转换操作应用的目标数据集,当动作需要一个结果时才计算。
- 动作(actions) :在数据集 RDD 上进行计算后返回一个结果值给驱动程序,例如 reduce。
// 从 protocols 文件创建RDD
val distFile = sc.textFile("/etc/protocols")
// Map 操作,每行的长度
val lineLengths = distFile.map(s => s.length)
// Reduce 操作,获得所有行长度的和,即文件总长度,这里才会执行 map 运算
val totalLength = lineLengths.reduce((a, b) => a + b)
// 可以将转换后的 RDD 保存到集群内存中
lineLengths.persist()
上面这个例子里面,map操作敲进去的时候,并没有被执行,在敲完reduce求和的时候,map运算才被执行的,也就是说想要的到最后map出来的结果要执行完reduce才行
persist方法是把map完的那个RDD保留到内存里
总结
基本编程步骤总结
所以,课程里面是这样总结的:
RDD 基本编程步骤可以总结为:
- 读取内、外部数据集创建 RDD。
- 对于 RDD 进行转化生成新的 RDD ,比如 map()、filter() 等。
- 对需要重用的数据执行 persist()/cache() 进行缓存。
- 执行行动操作获得最终结果,比如 count() 和 first()等。
没有做的实践操作
导入并使用jar包
还没有一个具体的应用场景,让我指定某个具体的jar包
集成编译环境下的配置操作
TO BE CONTINUED
标签:map,创建,笔记,RDD,sparkRDD,操作,spark,数据 来源: https://www.cnblogs.com/ltl0501/p/12109883.html