其他分享
首页 > 其他分享> > Spark 持久化介绍(cache/persist/checkpoint)

Spark 持久化介绍(cache/persist/checkpoint)

作者:互联网

目录

一、RDD 持久化

因为 Spark 程序执行的特性,即延迟执行和基于 Lineage 最大化的 pipeline,当 Spark 中由于对某个 RDD 的 Action 操作触发了作业时,会基于 Lineage 从后往前推,找到该 RDD 的源头 RDD,然后从前往后计算出结果。

很明显,如果对某个 RDD 执行了多次 Transformation 和 Action 操作,每次 Action 操作出发了作业时都会重新从源头 RDD 出计算一遍来获得 RDD,再对这个 RDD 执行相应的操作。当 RDD 本身计算特别复杂和耗时时,这种方式性能是非常差的,此时必须考虑对计算结果的数据进行持久化。

数据持久化(或称为缓存)就是将计算出来的 RDD 根据配置的持久化级别,保存在内存或磁盘中,以后每次对该 RDD 进行算子操作时,都会直接从内存或者磁盘中提取持久化的 RDD 数据,然后执行算子操作,而不会从源头处重新计算一遍该 RDD。

二、RDD 持久化级别

Spark 的持久化级别有如下几种:

持久化级别 含义
MEMORY_ONLY 将 RDD 数据保存在内存中,如果内存不够存放则数据有可能有部分不会进行持久化,在瑕疵对该 RDD 执行算子操作时,没有被持久化的数据需要从源头重新计算一遍。这是默认的持久化策略,使用 cache() 方法使用的就是这种持久化策略。
MEMORY_AND_DISK 优先尝试将数据保存在内存中,如果内存不够存放所有数据,会将数据写入到磁盘文件中,下次对该 RDD 执行算子操作时,持久化在内存和磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同 MEMORY_ONLY。唯一的区别是会将 RDD 数据进行序列化,将 RDD 的每个 partition 序列化为一个字节数据,这种方式更加节省内存,避免持久化的数据占用过多内存导致频繁 GC。
MEMORY_AND_DISK_SER 基本含义同 MEMORY_AND_DISK。唯一的区别是会将 RDD 数据进行序列化, RDD 的每个 partition 会被序列化成一个字节数据,种方式更加节省内存,避免持久化的数据占用过多内存导致频繁 GC。
DISK_ONLY 使用未序列化的 Java 对象格式,将数据全部写入磁盘文件中。
OFF_HEAP 将 RDD 以序列化的方式存在在 Alluxio 中(曾用名 Tachyon),而不是 Executor 的内存中,减少垃圾回收的压力,当 RDD 存储在 Alluxio 上时,Executors 的崩溃不会造成 RDD 数据的丢失。
MEMORY_ONLY_2,
MEMORY_ONLY_SER_2,
MEMORY_AND_DISK_2,
MEMORY_AND_DIST_SER_2,
DISK_ONLY_2,
等等
后缀有_2的持久化策略,表示将每个持久化的数据都复制一个副本,并保存在其他节点上,这种基于副本的持久化机制主要用于容错,如果单个节点持久化的数据丢失了,还可以从其他节点的副本上获取该 RDD 数据。

阅读 Spark 2.1.0 源码,RDD 持久化级别在 StorageLevel 类中有详细介绍,我们来详细看看。

//位置:/org/apache/spark/storage/StorageLevel.scala
/**
 * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
 * new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

这里列出了 12 种 RDD 缓存级别,每个缓存级别后面都 new 了一个 StorageLevel类的构造函数,什么意思呢?我么可以看看其构造函数。

// 位置:org/apache/spark/storage/StorageLevel.scala
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {

  // TODO: Also add fields for caching priority, dataset ID, and flushing.
  private def this(flags: Int, replication: Int) {
    this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
  }

  def this() = this(false, true, false, false) // For deserialization

  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication

StorageLevel类的主构造器包含了5个参数:

理解了这 5 个参数,就不难理解不同缓存级别的含义了,比如 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 缓存,表示将 RDD 的数据持久化在硬盘以及内存中,对数据进行序列化存储,并且将每个持久化的数据都复制一份副本保存到其他节点。

三、持久化级别选择

Spark 提供了这么多中持久化策略,那在实际场景中应该如何使用呢?

通常遵循的准则是,优先考虑内存,内存放不下就考虑序列化后放到内存中,尽量不要存储到磁盘中,因为一般 RDD 的重新计算要比从磁盘中读取更快,只有在需要更快的恢复时才使用备份级别(所有的存储级别都可以通过重新计算来提供全面的容错性,但是备份级别允许用于在 RDD 的备份上执行任务,而无须重新计算丢失的分区)。具体的选取方式如下:

四、删除持久化数据

Spark 的机制可以自动监控各个节点上的缓存使用率,并以 LRU (Least Recently Used,近期最少使用)算法删除过时的缓存数据。当然,如果想手动删除一个 RDD 数据的缓存,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist()方法。

五、 RDD cache 和 persist

我们先来看一个 persist 的示例。

// cache 使用示例:
val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist 使用示例
val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

唯一可以看出的是 persist() 持久化是可以手动指定持久化类型的,而 cache() 无须指定。那它们之间到底有什么区别呢?

通过阅读 Spark 2.1.0 源码,可以看到 cache() 方法调用了无参的 persist() 方法。想知道两者的区别,还需要进一步看看 persist() 方法逻辑。

//位置:org/apache/spark/rdd/RDD.scala
  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

可以看到 persist() 方法调用了 persist(StorageLevel.MEMORY_ONLY) 方法,即默认缓存方式采用 MEMORY_ONLY 级别。

//位置:org/apache/spark/rdd/RDD.scala
  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

继续往下看,persist() 方法有一个 StorageLevel 类型的参数,该参数表示 RDD 的缓存级别。至此,也就能看出 cache 和 persist 的区别了:即 cache 只有一个默认的缓存级别 MEMORY_ONLY,而 persist 可以根据情况设置其他的缓存级别。

//位置:org/apache/spark/rdd/RDD.scala
  /**
   * Mark this RDD for persisting using the specified level.
   *
   * @param newLevel the target storage level
   * @param allowOverride whether to override any existing level with the new one
   */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

六、RDD 的 checkpoint

把数据通过 cache 或 persist 持久化到内存或磁盘中,虽然是快速的但却不是最可靠的,checkpoint 机制的产生就是为了更加可靠地持久化数据以复用 RDD 计算数据,通常针对整个 RDD 计算链路中特别需要数据持久化的缓解,启用 checkpoint 机制来确保高容错和高可用性。

可以通过调用 SparkContext.setCheckpointDir() 方法来指定 checkpoint 是持久化的 RDD 数据的存放位置,这里可以存在本地或 HDFS 中(生产环境通常是放在 HDFS 上,借助 HDFS 本身的高容错和高可靠的特性完成数据的持久化),同时为了提高效率,可以指定多个目录。

需要说明的是,checkpoint 和 persist 一样是惰性执行的,在对某个 RDD 标记了需要 checkpoint 后,并不会立即执行,只有在后续有 Action 触发 Job 从而导致该 RDD 的计算,且在这个 Job 执行完成后,才会从后往前回溯找到标记了 checkpoint 的 RDD,然后重新启动一个 Job 来执行具体的 checkpoint 操作,所以一般都会对需要进行 checkpoint 的 RDD 先进行 persist 标记,从而把该 RDD 的计算结果持久化到内存或者磁盘上,以备 checkpoint 复用

下面是 checkpoint 使用的一个示例:

// 配置 checkpointDir
sc.setCheckpointDir("hdfs://nameservice/spark/checkpoint")
val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").cache()
// 对 rdd1 标记 checkpoint
rdd1.checkpoint()
// action 触发了 Job 才能导致 checkpoint 的真正执行
rdd1.count()

七、DataSet 的 cache 和 persist

阅读源码中无意中看到 DataSet 也支持 cache 和 persist 持久化方式,和 RDD 的持久化还是不太一样,我们来看看代码。

// 位置:org/apache/spark/sql/Dataset.scala
  /**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  // DataSet 的 cache 持久化调用
  def cache(): this.type = persist()

  /**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  def persist(): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this)
    this
  }

// 位置:org/apache/spark/sql/Dataset.scala 
  /**
   * Persist this Dataset with the given storage level.
   * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
   *                 `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
   *                 `MEMORY_AND_DISK_2`, etc.
   *
   * @group basic
   * @since 1.6.0
   */
  // DataSet 的 persist 持久化调用
  def persist(newLevel: StorageLevel): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
    this
  }

// 位置:org/apache/spark/sql/execution/CacheManager.scala
  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  // cache 和 persist 持久化调用的方法
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock

通过源码看到 cache() 调用的是无参的 persist() 方法,而 persist 调用 cacheQuery 方法,虽然 cache 和 persist 两者最终调用都是 cacheQuery 方法,但 cache 是采用默认的持久化级别 MEMORY_ADN_DISK,而 persist 则是用户自定义,这里默认的持久化持久和 RDD 是不一样的。

参考连接

  1. https://blog.csdn.net/houmou/article/details/52491419
  2. https://dongkelun.com/2018/06/03/sparkCacheAndPersist/
  3. https://blog.csdn.net/qq_27639777/article/details/82319560

标签:false,persist,cache,checkpoint,RDD,MEMORY,持久,StorageLevel
来源: https://www.cnblogs.com/lemonu/p/14373923.html