Spark编程指南
1、在maven里面添加引用,spark和hdfs的客户端的。
groupId = org.apache.spark
artifactId = spark-core_2.9.3
version = 0.8.1-incubating
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
2、把assembly/target/spark-assembly_2.9.3-0.8.1-incubating.jar添加到classpath里面,然后我们在程序里面要添加以下引用。
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
3、下面是官方的WorkCount的例子,可以参考一下。
/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
SparkContext是SparkContext的上下文对象,是非常核心的一个类,它的实例化方法是new SparkContext(master, appName, [sparkHome], [jars])。
master:master的地址。
appName:应用的名称。
sparkHome:spark的安装地址。
jars:jar包的位置。
4、Spark总是围绕这个一个概念来进行 resilient distributed dataset (RDD),是可以并行操作的支持容错的元素集合。目前支持两种类型的RDDs,parallelized collections和Hadoop datasets。
(1)Parallelized collections是scala中存在的集合类,并且支持并行操作。
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
正常情况之下,spark会自动设置并行任务所需要的cpu的分片,一般是每个cpu 2-4个分片,也可以自己手动设置,sc.parallelize(data, 10)。
(2)Spark支持hadoop上的任何数据集,比如text files, SequenceFiles,还有其它的InputFormat。
下面是text files的例子:
scala> val distFile = sc.textFile("data.txt")
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
SequenceFiles则使用SparkContext’s sequenceFile[K, V]
,比如sequenceFile[Int, String],Int对应的是IntWritable,String对应的是Text。
别的数据格式使用SparkContext.hadoopRDD,之后再介绍,这个文档没有介绍。
正常情况之下,spark是一个block一个任务。
(3)RDDs只支持两种操作: transformations, 从一个数据集转换成另外一种; actions, 通过对一个数据集进行运算之后返回一个值。
Spark当中所有的transformations都是延迟执行的,等到真正使用的时候才会进行运算。
默认的,每一个经过transformed的RDD当有action作用于它的时候,它会重新计算一遍,除非我们进行persist
(or cache
) 操作。
最后附录一下RDD的API地址:http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD
(4)RDD Persistence
Spark最重要的一个功能就是可以把RDD持久化或者缓存,当你进行一个持久化操作的时候,Spark会在所有节点的内存当中保存这个RDD,第一个的时候计算,之后一直使用不需要再重新计算了。缓存是实现迭代式算法的关键。我们可以使用persist()
or cache()方法来持久化一个RDD,它是容错的,当这个RDD的任何分片
丢失之后,它会在之前计算它的机器上重新计算。另外每一个RDD,有它自己的存储Level,存储在硬盘或者存储在内存,但是序列化成Java对象(节省空间),或者在集群间复制。要设置它,我们需要传递一个StorageLevel给persist(),cache()是默认的了是StorageLevel.MEMORY_ONLY (存储为反序列化对象在内存当中)
当内存足够的时候,我们可以使用MEMORY_ONLY;当内存不太好的时候,我们可以采用MEMORY_ONLY_SER,在内存中存储为一个字节数组,速度还可以;当操作的数据集合足够大的时候,我们就把中间结果写到硬盘上;如果要支持容错,就使用备份到2个节点上的方式。如果要自己定义一个的话,要使用StorageLevel的apply()方法。
5、共享变量
Spark提供了两种限制的共享变量,Broadcast和Accumulators。
(1)Broadcast允许程序员持有一个只读的变量在各个节点之间,它一个常用的场景就是用它来存储一个很大的输入的数据集给每个节点使用,Spark会只用它独有的广播算法来减少通信损失。下面是例子:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
(2)Accumulators是用来计数或者求总数的,使用SparkContext.accumulator(v)来给它一个初始化的值,然后用“+=”来进行操作,但是任务之间不能得到它的结果,只有驱动任务的程序可以得到它的结果。下面是例子:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
- 腾讯云GAME-TECH沙龙干货回顾:绿洲全球化案例分享
- 基于云计算的 CV 移动交互应用研究(1):CV交互+云计算
- ARKit 进阶:物理世界
- HLS 视频点播初探
- 周杰伦读心术背后的技术实现
- 盒子端 CSS 动画性能提升研究
- 把照片唱给你听 :腾讯 AI Lab 国际领先技术邀你「趣」体验
- 因为超算云,你将比别人早10年“抵达”未来
- 网页加速特技之 AMP
- 刷屏的背后:原来腾讯字体是CDC和Monotype联手打造的
- 重磅!腾讯与科大讯飞技术共创,Google ProtoBuf进入TARS家族!
- 一个简易版的T4代码生成"框架"
- yield在WCF中的错误使用——99%的开发人员都有可能犯的错误[上篇]
- 以上下文(Context)的形式创建一个共享数据的容器
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法