Apache Spark中使用DataFrame的统计和数学函数
我们在Apache Spark 1.3版本中引入了DataFrame功能, 使得Apache Spark更容易用. 受到R语言和Python中数据框架的启发, Spark中的DataFrames公开了一个类似当前数据科学家已经熟悉的单节点数据工具的API. 我们知道, 统计是日常数据科学的重要组成部分. 我们很高兴地宣布在即将到来的1.4版本中增加对统计和数学函数的支持.
在这篇博文中, 我们将介绍一些重要的功能, 其中包括:
- 随机数据生成功能
- 摘要和描述性统计功能
- 样本协方差和相关性功能
- 交叉表(又名列联表)
- 频繁项目(注: 即多次出现的项目)
- 数学函数
我们在例子中使用Python. 不过, Scala和Java也有类似的API.
1.随机数据生成
随机数据生成对于测试现有算法和实现随机算法(如随机投影)非常有用. 我们提供了sql.functions下的函数来生成包含从分配中抽取的独立同分布(i.i.d)的值的字段, 例如矩形分布函数uniform(rand
)和标准正态分布函数standard normal(randn
).
In [1]: from pyspark.sql.functions import rand, randn
In [2]: # 创建一个包含1列10行的DataFrame.
In [3]: df = sqlContext.range(0, 10)
In [4]: df.show()
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+
In [4]: # 生成其他两列, 分别使用矩形分布(uniform distribution)和正态分布(normal distribution).
In [5]: df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show()
+--+-------------------+--------------------+
|id| uniform| normal|
+--+-------------------+--------------------+
| 0| 0.7224977951905031| -0.1875348803463305|
| 1| 0.2953174992603351|-0.26525647952450265|
| 2| 0.4536856090041318| -0.7195024130068081|
| 3| 0.9970412477032209| 0.5181478766595276|
| 4|0.19657711634539565| 0.7316273979766378|
| 5|0.48533720635534006| 0.07724879367590629|
| 6| 0.7369825278894753| -0.5462256961278941|
| 7| 0.5241113627472694| -0.2542275002421211|
| 8| 0.2977697066654349| -0.5752237580095868|
| 9| 0.5060159582230856| 1.0900096472044518|
+--+-------------------+--------------------+
2.摘要和描述性统计信息
导入数据后执行的第一个操作是了解它们的大致情况. 对于数字列, 了解描述性摘要统计信息对理解数据的分布有很大帮助. 可以使用describe
函数来返回一个DataFrame, 其中会包含非空项目数, 平均值, 标准偏差以及每个数字列的最小值和最大值等信息.
In [1]: from pyspark.sql.functions import rand, randn
In [2]: # 一个略微不同的方式来生成两个随机的数列
In [3]: df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))
In [4]: df.describe().show()
+-------+------------------+-------------------+--------------------+
|summary| id| uniform| normal|
+-------+------------------+-------------------+--------------------+
| count| 10| 10| 10|
| mean| 4.5| 0.5215336029384192|-0.01309370117407197|
| stddev|2.8722813232690143| 0.229328162820653| 0.5756058014772729|
| min| 0|0.19657711634539565| -0.7195024130068081|
| max| 9| 0.9970412477032209| 1.0900096472044518|
+-------+------------------+-------------------+--------------------+
如果你有一个包含大量列的DataFrame, 那么你也可以在列的一个子集上应用describe函数:
In [4]: df.describe('uniform', 'normal').show()
+-------+-------------------+--------------------+
|summary| uniform| normal|
+-------+-------------------+--------------------+
| count| 10| 10|
| mean| 0.5215336029384192|-0.01309370117407197|
| stddev| 0.229328162820653| 0.5756058014772729|
| min|0.19657711634539565| -0.7195024130068081|
| max| 0.9970412477032209| 1.0900096472044518|
+-------+-------------------+--------------------+
虽然describe函数适用于快速探索性的数据分析, 你当然也可以使用DataFrame上的常规选择功能来控制描述性统计信息列表和应用的列:
In [5]: from pyspark.sql.functions import mean, min, max
In [6]: df.select([mean('uniform'), min('uniform'), max('uniform')]).show()
+------------------+-------------------+------------------+
| AVG(uniform)| MIN(uniform)| MAX(uniform)|
+------------------+-------------------+------------------+
|0.5215336029384192|0.19657711634539565|0.9970412477032209|
+------------------+-------------------+------------------+
3.样本协方差和相关性
协方差是衡量两个变量相对于彼此变化的度量. 若是正数意味则着有一个趋势: 一个变量增加, 另一个也增加. 若是负数则表示随着一个变量增加, 另一个变量趋于减小. DataFrame的两列的样本协方差可以通过如下方法计算:
In [1]: from pyspark.sql.functions import rand
In [2]: df = sqlContext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
In [3]: df.stat.cov('rand1', 'rand2')
Out[3]: 0.009908130446217347
In [4]: df.stat.cov('id', 'id')
Out[4]: 9.166666666666666
从上面可以看出, 两个随机生成的列的协方差接近于零, 而id列与其自身的协方差则非常高.
得到9.17的协方差值可能难以解释. 相关性是协方差的归一化度量. 因为它提供了两个随机变量之间的统计相关性的量化测量, 所以更容易理解.
In [5]: df.stat.corr('rand1', 'rand2')
Out[5]: 0.14938694513735398
In [6]: df.stat.corr('id', 'id')
Out[6]: 1.0
在上面的例子中, id列与自身完全相关, 而两个随机生成的列则具有较低的相关值..
4.交叉表(列联表)
交叉表提供了一组变量的频率分布表. 列联表是统计学中的一个强大的工具, 用于观察变量的统计显着性(或独立性). 在Spark 1.4中, 用户将能够将DataFrame的两列进行交叉以获得在这些列中观察到的不同对的计数. 下面是一个如何使用交叉表来获取列联表的例子.
In [1]: # 创建一个以(name, item)为字段名的DataFrame
In [2]: names = ["Alice", "Bob", "Mike"]
In [3]: items = ["milk", "bread", "butter", "apples", "oranges"]
In [4]: df = sqlContext.createDataFrame([(names[i % 3], items[i % 5]) for i in range(100)], ["name", "item"])
In [5]: # 查看前10列数据.
In [6]: df.show(10)
+-----+-------+
| name| item|
+-----+-------+
|Alice| milk|
| Bob| bread|
| Mike| butter|
|Alice| apples|
| Bob|oranges|
| Mike| milk|
|Alice| bread|
| Bob| butter|
| Mike| apples|
|Alice|oranges|
+-----+-------+
In [7]: df.stat.crosstab("name", "item").show()
+---------+----+-----+------+------+-------+
|name_item|milk|bread|apples|butter|oranges|
+---------+----+-----+------+------+-------+
| Bob| 6| 7| 7| 6| 7|
| Mike| 7| 6| 7| 7| 6|
| Alice| 7| 7| 6| 7| 7|
+---------+----+-----+------+------+-------+
需要牢记的一点是, 我们运行交叉表的列的基数不能太大. 也就是说, 不同的names和items的数量不能太大. 试想一下, 如果items包含10亿个不同的项目:你将如何适应你的屏幕上一大堆条目的表?
5.出现次数多的项目
找出每列中哪些项目频繁出现, 这对理解数据集非常有用. 在Spark 1.4中, 用户将能够使用DataFrame找到一组列的频繁项目. 我们已经实现了Karp等人提出的单通道算法. 这是一种快速的近似算法, 总是返回出现在用户指定的最小比例的行中的所有频繁项目. 请注意, 结果可能包含错误信息, 即出现了不频繁出现的项目.
In [1]: df = sqlContext.createDataFrame([(1, 2, 3)if i % 2 == 0 else (i, 2 * i, i % 4) for i in range(100)], ["a", "b", "c"])
In [2]: df.show(10)
+-+--+-+
|a| b|c|
+-+--+-+
|1| 2|3|
|1| 2|1|
|1| 2|3|
|3| 6|3|
|1| 2|3|
|5|10|1|
|1| 2|3|
|7|14|3|
|1| 2|3|
|9|18|1|
+-+--+-+
In [3]: freq = df.stat.freqItems(["a", "b", "c"], 0.4)
给定上面的DataFrame, 下面的代码找到每个列显示出现次数占总的40%以上频繁项目:
In [4]: freq.collect()[0]
Out[4]: Row(a_freqItems=[11, 1], b_freqItems=[2, 22], c_freqItems=[1, 3])
正如你所看到的, “11”和“1”是列“a”的频繁值. 你还可以通过使用struct函数创建一个组合列来查找列组合的频繁项目:
In [5]: from pyspark.sql.functions import struct
In [6]: freq = df.withColumn('ab', struct('a', 'b')).stat.freqItems(['ab'], 0.4)
In [7]: freq.collect()[0]
Out[7]: Row(ab_freqItems=[Row(a=11, b=22), Row(a=1, b=2)])
从上面的例子中可以看到, "a = 11和b = 22" 以及 "a = 1和b = 2" 的组合经常出现在这个数据集中. 请注意, " a = 11和b = 22" 的结果是误报(它们并不常出现在上面的数据集中)
6.数学函数
在Spark 1.4中还新增了一套数学函数. 用户可以轻松地将这些数学函数应用到列上面. 支持的数学函数列表来自这个文件(当1.4版本发行时, 我们也会发布预建(pre-built)文档). 输入需要是一个参数的column函数, 有cos
, sin
, floor
(向下取整), ceil
(向上取整)等函数. 对于采用两个参数作为输入的函数, 例如pow(x, y)
(计算x的y次幂), hypot(x, y)
(计算直角三角形的斜边长), 两个独立的列或者列的组合都可以作为输入参数.
In [1]: from pyspark.sql.functions import *
In [2]: df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14)
In [3]: # 你可以参照(reference)一个列, 或者提供一个列名
In [4]: df.select(
...: 'uniform',
...: toDegrees('uniform'),
...: (pow(cos(df['uniform']), 2) + pow(sin(df.uniform), 2)).
...: alias("cos^2 + sin^2")).show()
+--------------------+------------------+------------------+
| uniform| DEGREES(uniform)| cos^2 + sin^2|
+--------------------+------------------+------------------+
| 0.7224977951905031| 41.39607437192317| 1.0|
| 0.3312021111290707|18.976483133518624|0.9999999999999999|
| 0.2953174992603351|16.920446323975014| 1.0|
|0.018326130186194667| 1.050009914476252|0.9999999999999999|
| 0.3163135293051941|18.123430232075304| 1.0|
| 0.4536856090041318| 25.99427062175921| 1.0|
| 0.873869321369476| 50.06902396043238|0.9999999999999999|
| 0.9970412477032209| 57.12625549385224| 1.0|
| 0.19657711634539565| 11.26303911544332|1.0000000000000002|
| 0.9632338825504894| 55.18923615414307| 1.0|
+--------------------+------------------+------------------+
下一步是什么
本博文中描述的所有功能都在Python, Scala和Java中提供, 在Spark 1.4中也同样会提供, 此版本将在未来几天发布. 如果你不能等待, 你也可以自己从1.4版本分支中构建Spark: https://github.com/apache/spark/tree/branch-1.4 通过与Spark MLlib更好的集成, 统计学功能的支持将在DataFrames未来的版本中增加. 利用MLlib中现有的统计软件包, 可以支持管道(pipeline), 斯皮尔曼(Spearman)相关性, 排名以及协方差和相关性的聚合函数中的特征选择功能.
在博客文章的最后, 我们还要感谢Davies Liu, Adrian Wang和Spark社区的其他成员实现这些功能.
- 关于create database语句在10g,11g中的不同(r5笔记第88天)
- Web开发模式【Mode I 和Mode II的介绍、应用案例】
- 多线程编程学习四(Lock 的使用)
- Android编程规范
- 干货 | 深入分析Object.wait/notify实现机制
- 关于ORA-01555的问题分析(r5笔记第87天)
- 项目工具类
- AJAX常见面试题
- 干货 | Tomcat类加载机制触发的Too many open files问题分析
- 并行查询缓慢的问题分析(r5笔记第86天)
- Swagger文档转Word 文档
- AJAX应用【股票案例、验证码校验】
- IT中的闰秒问题(r5笔记第85天)
- 浅谈exp/imp(下) (r5笔记第84天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 【第19期】HTTP请求头referer
- Sentinel流控日志与索引
- Next.js + TypeScript 搭建一个简易的博客系统
- 【redis】02-redis持久化存储以及对象存储
- Kubernetes 集群可视化监控之 Weave Scope 入门
- h5 与原生 app 交互的原理
- 怎么在Openresty中REST?
- 【redis】04-redis 根据监听key的失效事件实现订单超时关闭
- 搭建分布式任务调度平台
- 微信小程序根据线上版本 Source Map 文件定位错误代码
- 全解系列:内存泄漏定位工具LeakCanary!
- 【Java反射】触手可及
- 【Flutter 专题】100 何为 Flutter Widgets ?
- Python爬虫 爬取豆瓣电影Top250信息
- Python编程 基础练习(一)