Update:spark_rdd算子:第2节 RDD_action算子_分区_缓存:缓存

时间:2019-09-28
本文章向大家介绍Update:spark_rdd算子:第2节 RDD_action算子_分区_缓存:缓存,主要包括Update:spark_rdd算子:第2节 RDD_action算子_分区_缓存:缓存使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

4. 缓存

概要
  1. 缓存的意义

  2. 缓存相关的 API

  3. 缓存级别以及最佳实践

4.1. 缓存的意义

使用缓存的原因 - 多次使用 RDD

需求: 在日志文件中找到访问次数最少的 IP 和访问次数最多的 IP

val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)

val interimRDD = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg) 

val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first()
val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first()

println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore")

sc.stop()
  这是一个 Shuffle 操作, Shuffle 操作会在集群内进行数据拷贝

在上述代码中, 多次使用到了 interimRDD, 导致文件读取两次, 计算两次, 有没有什么办法增进上述代码的性能?

使用缓存的原因 - 容错

当在计算 RDD3 的时候如果出错了, 会怎么进行容错?

会再次计算 RDD1 和 RDD2 的整个链条, 假设 RDD1 和 RDD2 是通过比较昂贵的操作得来的, 有没有什么办法减少这种开销?

上述两个问题的解决方案其实都是 缓存, 除此之外, 使用缓存的理由还有很多, 但是总结一句, 就是缓存能够帮助开发者在进行一些昂贵操作后, 将其结果保存下来, 以便下次使用无需再次执行, 缓存能够显著的提升性能.

所以, 缓存适合在一个 RDD 需要重复多次利用, 并且还不是特别大的情况下使用, 例如迭代计算等场景.

4.2. 缓存相关的 API

可以使用 cache 方法进行缓存
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)

val interimRDD = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .cache() 

val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first()
val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first()

println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore")

sc.stop()
  缓存

方法签名如下

cache(): this.type = persist()

cache 方法其实是 persist 方法的一个别名

也可以使用 persist 方法进行缓存
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)

val interimRDD = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .persist(StorageLevel.MEMORY_ONLY) 

val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first()
val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first()

println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore")

sc.stop()
  缓存

方法签名如下

persist(): this.type
persist(newLevel: StorageLevel): this.type

persist 方法其实有两种形式, persist() 是 persist(newLevel: StorageLevel) 的一个别名, persist(newLevel: StorageLevel) 能够指定缓存的级别

缓存其实是一种空间换时间的做法, 会占用额外的存储资源, 如何清理?
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)

val interimRDD = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .persist()

interimRDD.unpersist() 

val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first()
val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first()

println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore")

sc.stop()
  清理缓存

根据缓存级别的不同, 缓存存储的位置也不同, 但是使用 unpersist 可以指定删除 RDD 对应的缓存信息, 并指定缓存级别为 NONE

4.3. 缓存级别

其实如何缓存是一个技术活, 有很多细节需要思考, 如下

  • 是否使用磁盘缓存?

  • 是否使用内存缓存?

  • 是否使用堆外内存?

  • 缓存前是否先序列化?

  • 是否需要有副本?

如果要回答这些信息的话, 可以先查看一下 RDD 的缓存级别对象

val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)

val interimRDD = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .persist()

println(interimRDD.getStorageLevel)

sc.stop()

打印出来的对象是 StorageLevel, 其中有如下几个构造参数

根据这几个参数的不同, StorageLevel 有如下几个枚举对象

缓存级别userDisk 是否使用磁盘useMemory 是否使用内存useOffHeap 是否使用堆外内存deserialized是否以反序列化形式存储replication 副本数

NONE

false

false

false

false

1

DISK_ONLY

true

false

false

false

1

DISK_ONLY_2

true

false

false

false

2

MEMORY_ONLY

false

true

false

true

1

MEMORY_ONLY_2

false

true

false

true

2

MEMORY_ONLY_SER

false

true

false

false

1

MEMORY_ONLY_SER_2

false

true

false

false

2

MEMORY_AND_DISK

true

true

false

true

1

MEMORY_AND_DISK

true

true

false

true

2

MEMORY_AND_DISK_SER

true

true

false

false

1

MEMORY_AND_DISK_SER_2

true

true

false

false

2

OFF_HEAP

true

true

true

false

1

如何选择分区级别

Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:

如果您的 RDD 适合于默认存储级别(MEMORY_ONLY),leave them that way。这是 CPU 效率最高的选项,允许 RDD 上的操作尽可能快地运行.

如果不是,试着使用 MEMORY_ONLY_SER 和 selecting a fast serialization library 以使对象更加节省空间,但仍然能够快速访问。(Java和Scala)

不要溢出到磁盘,除非计算您的数据集的函数是昂贵的,或者它们过滤大量的数据。否则,重新计算分区可能与从磁盘读取分区一样快.

如果需要快速故障恢复,请使用复制的存储级别(例如,如果使用 Spark 来服务 来自网络应用程序的请求)。All 存储级别通过重新计算丢失的数据来提供完整的容错能力,但复制的数据可让您继续在 RDD 上运行任务,而无需等待重新计算一个丢失的分区.

原文地址:https://www.cnblogs.com/mediocreWorld/p/11604950.html