初识Spark

时间:2022-07-25
本文章向大家介绍初识Spark,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Spark特点

Spark是Apache的一个顶级项目,Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

Spark的计算速度也要比MapReduce快得多,它有一个先进的DAG执行引擎,支持非循环的数据流和内存计算。官网介绍说在使用内存的情况下快100倍,而使用磁盘的情况下快10倍。

而且Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。

Spark也比MapReduce要易于使用,并且可以使用Java, Scala, Python, R等语言进行开发。Spark 提供了80多个高级API,可以很容易地实现并行计算的应用程序。还可以通过Scala、Python和R shells等交互式命令行,交互地使用它。

Spark 主要有四个特点:

  • 高级 API 剥离了对集群本身的关注,Spark 应用开发者可以专注于应用所要做的计算本身。下图是python使用Spark API的代码:
  • Spark 很快,支持交互式计算和复杂算法以及非循环的数据流和内存计算。下图是官网上展示的MapReduce与Spark进行回归计算时,计算速度的对比图:
  • Spark 是一个十分通用的计算引擎,可用它来完成各种各样的运算,包括 SQL 查询、文本处理、机器学习等,而在 Spark 出现之前,我们一般需要学习各种各样的引擎来分别处理这些需求。如下图:
  • Spark 可以运行在各种平台之上,例如可以运行在:Hadoop, Mesos, Kubernetes, standalone等平台上,或者运行在 cloud上。并且能访问各种数据源,包括HDFS, Cassandra, HBase 以及 S3等。

Spark官网地址:

http://spark.apache.org/


Spark与Hadoop深入对比

Spark的生态系统简称BDAS。如下图:

Hadoop生态圈对比Spark BDAS:

Hadoop对比Spark:

MapReduce对比Spark:


Spark开发语言及运行模式介绍

Spark支持的开发语言:

  • Python
  • Scala(推荐)
  • Java
  • R

Spark运行模式:

  • Standalone(内置)
  • Yarn(推荐)
  • Mesos
  • Local

Scala&Maven安装

安装Scala时,需要先准备好JDK环境,而我这里已经准备好jdk1.8的环境了。

Scala官网下载地址:

http://www.scala-lang.org/download/

下载Scala:

[root@study-01 ~]# cd /usr/local/src
[root@study-01 /usr/local/src]# wget https://downloads.lightbend.com/scala/2.12.5/scala-2.12.5.tgz

解压:

[root@study-01 /usr/local/src]# tar -zxvf scala-2.12.5.tgz -C /usr/local/
[root@study-01 /usr/local/src]# cd ../
[root@study-01 /usr/local]# ls
bin  etc  games  include  lib  lib64  libexec  sbin  scala-2.12.5  share  src
[root@study-01 /usr/local]# cd scala-2.12.5/
[root@study-01 /usr/local/scala-2.12.5]# ls
bin  doc  lib  man
[root@study-01 /usr/local/scala-2.12.5]#

配置环境变量:

[root@study-01 ~]# vim .bash_profile  # 更改如下内容
export SCALA_HOME=/usr/local/scala-2.12.5
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$SCALA_HOME/bin
export PATH
[root@study-01 ~]# source .bash_profile
[root@study-01 ~]# scala  # 测试能否执行scala命令
Welcome to Scala 2.12.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161).
Type in expressions for evaluation. Or try :help.

scala> 

Maven官网下载地址:

https://maven.apache.org/download.cgi

下载并解压:

[root@study-01 ~]# cd /usr/local/src/
[root@study-01 /usr/local/src]# wget  http://mirror.bit.edu.cn/apache/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
[root@study-01 /usr/local/src]# tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local
[root@study-01 /usr/local/src]# cd ../apache-maven-3.5.2/
[root@study-01 /usr/local/apache-maven-3.5.2]# ls
bin  boot  conf  lib  LICENSE  NOTICE  README.txt
[root@study-01 /usr/local/apache-maven-3.5.2]#

配置环境变量:

[root@study-01 ~]# vim .bash_profile  # 更改如下内容
export MAVEN_HOME=/usr/local/apache-maven-3.5.2
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin
[root@study-01 ~]# source .bash_profile
[root@study-01 ~]# mvn --version  # 测试能否执行mvn命令
Apache Maven 3.5.2 (138edd61fd100ec658bfa2d307c43b76940a5d7d; 2017-10-18T15:58:13+08:00)
Maven home: /usr/local/apache-maven-3.5.2
Java version: 1.8.0_161, vendor: Oracle Corporation
Java home: /usr/local/jdk1.8/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-327.el7.x86_64", arch: "amd64", family: "unix"
[root@study-01 ~]#

Spark环境搭建及wordcount案例实现

Spark官网下载地址:

http://spark.apache.org/downloads.html

我这里下载的是2.1.0版本的源码包,官网的编译安装文档:

http://spark.apache.org/docs/2.1.0/building-spark.html

从官网的介绍,我们得知:

  • Java需要7+版本,而且在Spark2.0.0之后Java 7已经被标识成deprecated了,但是不影响使用,但是在Spark2.2.0版本之后Java 7的支持将会被移除;
  • Maven需要3.3.9+版本

下载Spark2.1.0版本的源码包:

下载并解压:

[root@study-01 /usr/local/src]# wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0.tgz
[root@study-01 /usr/local/src]# tar -zxvf spark-2.1.0.tgz -C /usr/local
[root@study-01 /usr/local/src]# cd ../spark-2.1.0/
[root@study-01 /usr/local/spark-2.1.0]# ls
appveyor.yml  common           data      external  licenses     NOTICE   R          scalastyle-config.xml  yarn
assembly      conf             dev       graphx    mesos        pom.xml  README.md  sql
bin           CONTRIBUTING.md  docs      launcher  mllib        project  repl       streaming
build         core             examples  LICENSE   mllib-local  python   sbin       tools
[root@study-01 /usr/local/spark-2.1.0]#

安装完成之后我们还需要使用Spark源码目录中的dev下的make-distribution.sh脚本进行编译,官方提供的编译命令如下:

./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pmesos -Pyarn

参数说明:

  • --name:指定编译完成后Spark安装包的名字
  • --tgz:以tgz的方式进行压缩
  • -Psparkr:编译出来的Spark支持R语言
  • -Phadoop-2.4:以hadoop-2.4的profile进行编译,具体的profile可以看出源码根目录中的pom.xml中查看
  • -Phive和-Phive-thriftserver:编译出来的Spark支持对Hive的操作
  • -Pmesos:编译出来的Spark支持运行在Mesos上
  • -Pyarn:编译出来的Spark支持运行在YARN上

那么我们就可以根据具体的条件来编译Spark,比如我们使用的Hadoop版本是2.6.0-cdh5.7.0,并且我们需要将Spark运行在YARN上、支持对Hive的操作,那么我们的Spark源码编译脚本就是:

[root@study-01 /usr/local/spark-2.1.0]# ./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0

但是在执行这个命令之前我们先需要编辑pom.xml文件,增加cdh的maven repository:

[root@study-01 /usr/local/spark-2.1.0]# vim pom.xml  # 在<repositories>标签内,加入如下内容
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
[root@study-01 /usr/local/spark-2.1.0]# 

然后还需要更改编译脚本的mvn命令路径,因为使用自带的mvn编译有些慢:

[root@study-01 /usr/local/spark-2.1.0]# vim dev/make-distribution.sh
MVN="$MAVEN_HOME/bin/mvn"
[root@study-01 /usr/local/spark-2.1.0]# 

完成以上的修改后,就可以执行编译命令了,编译的过程会有些慢(我这里编译了半个多小时)。而且内存尽量分配得大一些,避免内存不足导致编译中断。

编译完成之后,spark目录下会增加一个.tgz的文件,把这个文件解压到/usr/local/目录下:

[root@study-01 /usr/local/spark-2.1.0]# ls |grep *.tgz
spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz
[root@study-01 /usr/local/spark-2.1.0]# tar -zxvf spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz -C /usr/local
[root@study-01 /usr/local/spark-2.1.0]# cd ../spark-2.1.0-bin-2.6.0-cdh5.7.0/
[root@study-01 /usr/local/spark-2.1.0-bin-2.6.0-cdh5.7.0]# ls
bin  conf  data  examples  jars  LICENSE  licenses  NOTICE  python  README.md  RELEASE  sbin  yarn
[root@study-01 /usr/local/spark-2.1.0-bin-2.6.0-cdh5.7.0]#

到此为止,我们的spark就安装完成了。接下来我们尝试一下启动Spark的shell终端:

[root@study-01 /usr/local/spark-2.1.0-bin-2.6.0-cdh5.7.0]# ./bin/spark-shell --master local[2]

命令说明:

  • master用于指定使用哪种模式启动
  • local 表示本地模式启动,方括号里的数字表示开启几个线程

关于启动spark shell的官方文档说明:

http://spark.apache.org/docs/2.1.0/submitting-applications.html

启动成功:

启动成功后,我们来实现wordcount的案例。官网的快速入门文档:

http://spark.apache.org/docs/2.1.0/quick-start.html

现在有一个文件,内容如下:

[root@study-01 /data]# cat hello.txt 
hadoop welcome
hadoop hdfs mapreduce
hadoop hdfs
hello hadoop
spark vs mapreduce
[root@study-01 /data]#

在spark shell里完成对该文件的wordcount:

scala> val file = sc.textFile("file:///data/hello.txt")  # 读取文件
file: org.apache.spark.rdd.RDD[String] = file:///data/hello.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> file.collect  # 打印读取的数据
res1: Array[String] = Array(hadoop welcome, hadoop hdfs mapreduce, hadoop hdfs, hello hadoop, spark vs mapreduce)

scala> val a = file.flatMap(line => line.split(" "))  # 按空格进行拆分
a: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> a.collect
res2: Array[String] = Array(hadoop, welcome, hadoop, hdfs, mapreduce, hadoop, hdfs, hello, hadoop, spark, vs, mapreduce)

scala> val b = a.map(word => (word,1))  # 进行map操作,给每个单词附上1
b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28

scala> b.collect
res3: Array[(String, Int)] = Array((hadoop,1), (welcome,1), (hadoop,1), (hdfs,1), (mapreduce,1), (hadoop,1), (hdfs,1), (hello,1), (hadoop,1), (spark,1), (vs,1), (mapreduce,1))

scala> val c = b.reduceByKey(_ + _)  # 进行Reduce操作,把每个相同key的值相加,并整合在一起
c: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30

scala> c.collect
res4: Array[(String, Int)] = Array((mapreduce,2), (hello,1), (welcome,1), (spark,1), (hadoop,4), (hdfs,2), (vs,1))

scala>

如上,可以看到,通过简单的交互式的代码我们就完成了对文件的词频统计,并且这些方法都可以形成一个方法链的调用,所以其实一句代码就可以完成wordcount了,如下示例:

scala> sc.textFile("file:///data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect
res5: Array[(String, Int)] = Array((mapreduce,2), (hello,1), (welcome,1), (spark,1), (hadoop,4), (hdfs,2), (vs,1))

scala>

我们还可以在web页面上看到任务执行的信息,访问主机ip的4040端口即可,如下: