[大数据之Spark]——Actions算子操作入门实例
Actions
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
这个方法会传入两个参数,计算这两个参数返回一个结果。返回的结果与下一个参数一起当做参数继续进行计算。
比如,计算一个数组的和。
//创建数据集
scala> var data = sc.parallelize(1 to 3,1)
scala> data.collect
res6: Array[Int] = Array(1, 2, 3)
//collect计算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6
collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
返回数据集的所有元素,通常是在使用filter或者其他操作的时候,返回的数据量比较少时使用。
比如,显示刚刚定义的数据集内容。
//创建数据集
scala> var data = sc.parallelize(1 to 3,1)
scala> data.collect
res6: Array[Int] = Array(1, 2, 3)
count()
Return the number of elements in the dataset.
计算数据集的数据个数,一般都是统计内部元素的个数。
//创建数据集
scala> var data = sc.parallelize(1 to 3,1)
//统计个数
scala> data.count
res7: Long = 3
scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2
first()
Return the first element of the dataset (similar to take(1)).
返回数据集的第一个元素,类似take(1)
//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))
//获取第一条元素
scala> data.first
res9: (String, Int) = (A,1)
take(n)
Return an array with the first n elements of the dataset.
返回数组的头n个元素
//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))
//如果n大于总数,则会返回所有的数据
scala> data.take(8)
res12: Array[(String, Int)] = Array((A,1), (B,1))
//如果n小于等于0,会返回空数组
scala> data.take(-1)
res13: Array[(String, Int)] = Array()
scala> data.take(0)
res14: Array[(String, Int)] = Array()
takeSample(withReplacement, num, [seed])
Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
这个方法与sample还是有一些不同的,主要表现在:
- 返回具体个数的样本(第二个参数指定)
- 直接返回array而不是RDD
- 内部会将返回结果随机打散
//创建数据集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
//随机2个数据
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)
//随机4个数据,注意随机的数据可能是重复的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)
//第一个参数是是否重复
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)
scala> data.takeSample(false,5,1)
res3: Array[Int] = Array(3, 5, 7, 1)
takeOrdered(n, [ordering])
Return the first n elements of the RDD using either their natural order or a custom comparator.
基于内置的排序规则或者自定义的排序规则排序,返回前n个元素
//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
//返回排序数据
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c)
saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
将数据集作为文本文件保存到指定的文件系统、hdfs、或者hadoop支持的其他文件系统中。
//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
//保存为test_data_save文件
scala> data.saveAsTextFile("test_data_save")
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
<console>:24: error: not found: type GzipCodec
data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec
//保存为压缩文件
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
查看文件
[xingoo@localhost bin]$ ll
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2
[xingoo@localhost bin]$ cd test_data_save2
[xingoo@localhost test_data_save2]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save2]$ cd ..
[xingoo@localhost bin]$ cd test_data_save
[xingoo@localhost test_data_save]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save]$ cat part-00000
b
a
e
f
c
saveAsSequenceFile(path)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
保存为sequence文件
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:22
scala> data.saveAsSequenceFile("kv_test")
[xingoo@localhost bin]$ cd kv_test/
[xingoo@localhost kv_test]$ ll
total 12
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:25 _SUCCESS
saveAsObjectFile(path)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
基于Java序列化保存文件
scala> var data = sc.parallelize(List("a","b","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:22
scala> data.saveAsObjectFile("str_test")
scala> var data2 = sc.objectFile[Array[String]]("str_test")
data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at <console>:22
scala> data2.collect
countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
统计KV中,相同K的V的个数
//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22
//统计个数
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)
foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
针对每个参数执行,通常在更新互斥或者与外部存储系统交互的时候使用
// 创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22
// 遍历
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello
- RxJava2 实战(1) - 后台执行耗时操作,实时通知 UI 更新
- AutoIt木马又一发:暗藏神秘照片
- css3动画从入门到精通
- ReactJs和React Native的那些事
- Linux学习-文件排序和FASTA文件操作
- Bash漏洞再次演进:缓冲区溢出导致远程任意命令执行
- Pandas,让Python像R一样处理数据,但快
- ViewPager 实现 Galler 效果, 中间大图显示,两边小图展示
- 最小生成树-Prim算法和Kruskal算法
- Bash漏洞批量检测工具与修复方案
- 组合模式
- 用Fiddler在Android上抓包(Http+https)
- Architecture Components ViewModel的控制。
- React Native之AppRegistry模块
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- JDK1.8HashMap源码学习-put操作以及扩容(二)
- Python 中的数字到底是什么?
- 详解 Python 的二元算术运算,为什么说减法只是语法糖?
- 详解增强算术赋值:“-=”操作是怎么实现的?
- Hyperledger Explorer 环境搭建详解
- [译]在Solidity中创建无限制列表
- java安全编码指南之:声明和初始化
- java安全编码指南之:表达式规则
- java安全编码指南之:Number操作
- 如何提高代码质量
- 小姐姐非要问我:spring编程式事务是啥?
- 阿里3面:Spring声明式事务连环炮,让我措手不及。。
- 旷视科技|商用端侧Raw图像降噪方案
- Python | 时间戳转换
- Stata | 2020年国家社科基金立项名单分析