Spark 面试题系列-2

时间:2022-07-22
本文章向大家介绍Spark 面试题系列-2,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1 RDD 如何通过记录更新的方式容错

RDD 实现分布式数据集容错方法有两种:

  1. 数据检查点
  2. 记录更新

RDD 采用记录更新的方式:记录所有更新点的成本很高。 所以,RDD只支持粗颗粒变换,即只记录单个块(分区 partition)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来; 变换序列指,每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称“血统”容错。

2 Spark 优越性

Spark 的几个优势

  1. 更高的性能。因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。在数据全部加载到内存的情况下,Spark 可以比 Hadoop 快100倍,在内存不够存放所有数据的情况下快 Hadoop 10倍。
  2. 通过建立在 Java, Scala, Python, SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。
  3. 与现有 Hadoop 1和2.x(YARN)生态兼容,因此机构可以无缝迁移,目前也在做 Yarn 3 的支持。
  4. 方便下载和安装。方便的 shell(REPL: Read-Eval-Print-Loop)可以对 API 进行交互式的学习。
  5. 借助高等级的架构提高生产力,从而可以讲精力放到计算上。

MapReduce 与 Spark 相比,有哪些异同点

  1. 基本原理上 1.1 MapReduce: 基于磁盘的大数据批量处理系统 1.2 Spark: 基于 RDD (弹性分布式数据集)数据处理,显示将 RDD 数据存储到磁盘和内存中。
  2. 模型上 2.1 MapReduce 可以处理超大规模的数据,适合日志分析挖掘等较少的迭代的长任务需求,结合了数据的分布式的计算。 2.2 Spark 适合数据的挖掘,机器学习等多轮迭代式计算任务。

在 Spark 中,一个应用程序包含多个 Job 任务,在 MapReduce 中,一个 Job 任务就是一个应用。

3 Transformation 和 action 是什么?区别?举几个常用方法

RDD 创建后就可以在 RDD 上进行数据处理。RDD 支持两种操作:

  1. 转换(transformation): 即从现有的数据集创建一个新的数据集
  2. 动作(action): 即在数据集上进行计算后,返回一个值给 Driver 程序

RDD 的转化操作 Transformation 是返回一个新的 RDD 的操作,比如 map() 和 filter() ,而行动操作则是向驱动器程序 Driver 返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first() 。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

RDD 中所有的 Transformation 都是惰性的,也就是说,它们并不会直接计算结果。相反的它们只是记住了这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给 Driver 的 Action 时,这些 Transformation 才会真正运行。

4 RDD 容错方式

Spark 选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建 RDD 的一系列变换序列(每个 RDD 都包含了他是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称血统容错)记录下来,以便恢复丢失的分区。lineage 本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据(所以也称为粗粒度)。

5 可以解释一下这两段程序的异同吗

# 1 
val counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println("Counter value: " + counter)

# 2
val counter = 0
val data = Seq(1, 2, 3)
var rdd = sc.parallelizze(data)
rdd.foreach(x => counter += x)
println("Counter value: " + counter)

所有在 Driver 程序追踪的代码看上去好像在 Driver 上计算,实际上都不在本地,每个 RDD 操作都被转换成 Job 分发至集群的执行器 Executor 进程中运行,即便是单机本地运行模式,也是在单独的执行器进程上运行,与 Driver 进程属于不用的进程。所以每个 Job 的执行,都会经历序列化、网络传输、反序列化和运行的过程

再具体一点解释是 foreach 中的匿名函数 x => counter += x 首先会被序列化然后被传入计算节点,反序列化之后再运行,因为 foreach 是 Action 操作,结果会返回到 Driver 进程中。

在序列化的时候,Spark 会将 Job 运行所依赖的变量、方法全部打包在一起序列化,相当于它们的副本,所以 counter 会一起被序列化,然后传输到计算节点,是计算节点上的 counter 会自增,而 Driver 程序追踪的 counter 则不会发生变化。执行完成之后,结果会返回到 Driver 程序中。而 Driver 中的 counter 依然是当初的那个 Driver 的值为0。

6 说说 map 和 mapPartitions 的区别

map 中的 func 作用的是 RDD 中每一个元素,而 mapPartitioons 中的 func 作用的对象是 RDD 的一整个分区。所以 func 的类型是 Iterator<T> => Iterator<T>,其中 T 是输入 RDD 的元素类型。

这些可以用 API 中看到。

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 /**
  * Return a new RDD by applying a function to each partition of this RDD.
  *
  * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
  * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
  */
 def mapPartitions[U: ClassTag](
     f: Iterator[T] => Iterator[U],
     preservesPartitioning: Boolean = false): RDD[U] = withScope {
   val cleanedF = sc.clean(f)
   new MapPartitionsRDD(
     this,
     (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
     preservesPartitioning)
 }

7 groupByKey 和 reduceByKey 是属于 Transformation 还是 Action?

前者,因为 Action 输出的不再是 RDD 了,也就意味着输出不是分布式的,而是回送到 Driver 程序。以上两种操作都是返回 RDD,所以应该属于 Transformation。

8 说说检查点 checkpoint 的意义

分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储中

9 说说 Spark 的特点,相对于 MapReduce 来说

  1. 减少磁盘 I/O,MR 会把 map 端将中间输出和结果存储在磁盘中,reduce 端又需要从磁盘读写中间结果,势必造成磁盘 I/O 称为瓶颈。Spark 允许将 map 端的中间结果输出和结果存储在内存中,reduce 端在拉取中间结果的时候避免了大量的磁盘 I/O。
  2. 增加并行度,由于把中间结果写到磁盘与从磁盘读取中间结果属于不同的缓解,Hadoop 将他们简单地通过串行执行衔接起来,Spark 则把不同的环节抽象成为 Stage,允许多个 Stage 既可以串行又可以并行执行。
  3. 避免重新计算,当 Stage 中某个分区的 Task 执行失败后,会重新对此 Stage 调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。
  4. 可选的 Shuffle 排序,MR 在 Shuffle 之前有着固定的排序操作,而 Spark 则可以根据不同场景选择在 map 端排序还是 reduce 排序。
  5. 灵活的内存管理策略,Spark 将内存分为堆上的存储内存、堆外的存储内存,堆上的执行内存,堆外的执行内存4个部分。

10 Task 和 Stage 的分类

Task 指具体的执行任务,一个 Job 在每个 Stage 内都会按照 RDD 的 Partition 数量,创建多个 Task,Task 分为 ShuffleMapTask 和 ResultTask 两种。

ShuffleMapStage 中的 Task 为 ShuffleMapTask,而 ResultStage 中的 Task 为 ResultTask。

ShuffleMapTask 和 ResultTask 类似于 Hadoop 中的 Map 任务和 Reduce 任务。