大数据:Spark实战经验总结(python版)
作者:互联网
人工智能
- 大数据,Spark,Hadoop,python,pyspark
-
大数据:Spark实战经验总结
大数据,Spark,Hadoop,python,pyspark
大数据:Spark实战经验总结
1. RDD持久化
说RDD持久化之前,先来了解一下惰性机制。
1)RDD的惰性机制:
RDD在设计时采用了惰性机制的特性,指的是转换RDD的过程先记录而不发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。举例说明:
假设/mnt/下又一个文件word.txt,内容如下:
Hadoop is good
Spark is fast
Spark is better
代码:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///mnt/word.txt") # 记录,并不执行
lineLengths = lines.map(lambda s:len(s)) # 记录,并不执行
totalLength = lineLengths.reduce(lambda a, b: a + b) # 开始执行!
为了看着更清晰,代码不妨写成:
# 记录,并不执行。
rdd1 = sc.textFile("file:///mnt/word.txt") # 是一个RDD对象。
# 记录,并不执行。
rdd2 = rdd1.map(lambda s:len(s)) # 是一个RDD对象。
# 开始执行!
totalLength = rdd2.reduce(lambda a, b: a+b) # 是个数字。
"""
打印验证
"""
print(rdd1) # file:///mnt/word.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
print(rdd2) # PythonRDD[5] at RDD at PythonRDD.scala:53
print(totalLength) # 42
# 调用RDD自带的函数,来取出rdd1和rdd2对象中的值
rdd1.foreach(print) # Spark is better Hadoop is good Spark is fast
rdd2.foreach(print) # 14 13 15
上面代码中,
(i)第1行语句中的textFile()是一个转换操作(函数返回一个RDD对象),系统只会记录这次转换,并不会真正读取word.txt文件的数据到内存中;
(ii)第2行语句的map()也是一个转换操作(函数返回一个RDD对象),系统只是记录这次转换,不会真正执行map()方法;
(iii)而第3行语句的reduce()是一个“行动”类型的操作(函数返回一个整型数字),这时系统会生成一个作业,触发真正的计算。也就是说,这时才会加载word.txt的数据到内存,生成RDD。
2)RDD持久化 — (解决惰性机制的效率问题):
(1)效率低的背景:
在Spark中,RDD采用惰性求值的机制。导致每次遇到“行动”操作,都会从头开始执行计算(即每次调用行动操作,都会触发一次从头开始的计算),这对于迭代计算而言,代价是很大的,影响效率(因为迭代计算经常需要多次重复使用同一组数据)。下面是多次计算同一个RDD的例子:
li = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(li) # 记录操作。生成一个RDD
print(rdd.count()) # 行动操作,触发一次真正的从头到尾的计算。运行结果:3
print(','.join(rdd.collect())) # 行动操作,再触发一次真正的从头到尾的计算。运行结果:'Hadoop', 'Spark', 'Hive'
# 注:rrd.collect()是以数组形式返回数据集中的所有元素。结果:['Hadoop', 'Spark', 'Hive']
(2)增加持久化(缓存):
为了避免这种重复计算的开销,可以使用RDD的持久化(缓存),方法是使用persist()函数将一个RDD标记为持久化。注意:之所以“标记为持久化”,是因为出现persist()语句的地方并不会马上计算生成RDD并把它持久化,而是要对等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复时候。
persist()使用的时候有两种参数供选择:
- persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则)。表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。 ---- 默认这种。
- persist(MEMORY_AND_DISK):内存+磁盘,超出内存则存硬盘。表示将RDD作为反序列化的 对象存储于JVM中,如果内存不足,超出的分区将会被存储在硬盘上。
这两种,默认参数是persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则),因为效率第一,另一种超出就放在硬盘上不但会影响效率,还会造成资源浪费(尤其数据量巨大的时候)。
上面的例子,增加持久化缓存语句:
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
# 创建SparkConf对象,并给对象赋值
conf = SparkConf().setMaster("local").setAppName("My app")
# 创建SparkContext对象,不妨命名为sc
sc = SparkContext(conf=conf)
"""
spark创建的sc,其功能之一是调用自带的parallelize()函数来加载自定义的变量来创建RDD,如下面的 sc.parallelize:
(sc还有如加载文件textFile()等其他很多函数和功能)
"""
li = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(li)
# 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存
rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。
# 等价 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY)
# rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) # --内存+磁盘,超出内存则存硬盘方式。
# 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。
print(rdd.count())
# 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
print(','.join(rdd.collect()))
# 解除标记。释放名为rdd的RDD对象在内存中的缓存空间
rdd.unpersist()
注:持久化RDD会占用内存空间,当不再需要一个RDD时,就可以使用unpersist()函数手动地把持久化的RDD从缓存中移除,释放内存空间。
注意,上面标记为仅内存执行rdd.persist() 或 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) 后,要想重新标记为内存+磁盘执行 rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ,需要先执行rdd.unpersist()释放标记!!!否则报错!
(3)实际开发中,持久化(缓存)写法:
实际开发中,我们使用cache()方法就会自动调用persist(MEMORY_ONLY),我们一般用rdd.cache()或rdd.persist()即可,不用再导包from pyspark.storagelevel import StorageLevel
来传参,通过查看cache()和persist()源码,可以看到这两个方法会自动导入包。
重点!!RDD持久化 实际开发代码,一般写法如下:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My app")
sc = SparkContext(conf=conf)
li = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(li)
# 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存
rdd.cache() # 会调用persist(MEMORY_ONLY)
# 或 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。
# 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。
print(rdd.count())
# 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
print(','.join(rdd.collect()))
# 解除标记。释放名为rdd的RDD对象在内存中的缓存空间
rdd.unpersist()
附:cache()和persist()函数的源码。在Anaconda的site-packages/pyspark/rdd.py文件:
标签:缓存,持久,rdd,python,RDD,实战经验,内存,persist,Spark 来源: https://blog.csdn.net/Acegem/article/details/123086945