使用Python Pandas处理亿级数据
原文:http://www.justinablog.com/archives/1357?utm_source=tuicool&utm_medium=referral
在数据分析领域,最热门的莫过于Python和R语言,此前有一篇文章《别老扯什么Hadoop了,你的数据根本不够大》指出:只有在超过5TB数据量的规模下,Hadoop才是一个合理的技术选择。这次拿到近亿条日志数据,千万级数据已经是关系型数据库的查询分析瓶颈,之前使用过Hadoop对大量文本进行分类,这次决定采用Python来处理数据:
- 硬件环境
- CPU:3.5 GHz Intel Core i7
- 内存:32 GB HDDR 3 1600 MHz
- 硬盘:3 TB Fusion Drive
- 数据分析工具
- Python:2.7.6
- Pandas:0.15.0
- IPython notebook:2.0.0
源数据如下表所示:
Table |
Size |
Desc |
|
---|---|---|---|
ServiceLogs |
98,706,832 rows x 14 columns |
8.77 GB |
交易日志数据,每个交易会话可以有多条交易 |
ServiceCodes |
286 rows × 8 columns |
20 KB |
交易分类的字典表 |
数据读取
启动IPython notebook,加载pylab环境:
ipython notebook --pylab=inline
Pandas提供了IO工具可以将大文件分块读取,测试了一下性能,完整加载9800万条数据也只需要263秒左右,还是相当不错了。
import pandas as pd
reader = pd.read_csv('data/servicelogs', iterator=True)try:
df = reader.get_chunk(100000000)except StopIteration:
print "Iteration is stopped."
1百万条 |
1千万条 |
1亿条 |
|
---|---|---|---|
ServiceLogs |
1 s |
17 s |
263 s |
使用不同分块大小来读取再调用 pandas.concat 连接DataFrame,chunkSize设置在1000万条左右速度优化比较明显。
loop = TruechunkSize = 100000chunks = []while loop:
try:
chunk = reader.get_chunk(chunkSize)
chunks.append(chunk)
except StopIteration:
loop = False
print "Iteration is stopped."df = pd.concat(chunks, ignore_index=True)
下面是统计数据,Read Time是数据读取时间,Total Time是读取和Pandas进行concat操作的时间,根据数据总量来看,对5~50个DataFrame对象进行合并,性能表现比较好。
Chunk Size |
Read Time (s) |
Total Time (s) |
Performance |
---|---|---|---|
100,000 |
224.418173 |
261.358521 |
|
200,000 |
232.076794 |
256.674154 |
|
1,000,000 |
213.128481 |
234.934142 |
√ √ |
2,000,000 |
208.410618 |
230.006299 |
√ √ √ |
5,000,000 |
209.460829 |
230.939319 |
√ √ √ |
10,000,000 |
207.082081 |
228.135672 |
√ √ √ √ |
20,000,000 |
209.628596 |
230.775713 |
√ √ √ |
50,000,000 |
222.910643 |
242.405967 |
|
100,000,000 |
263.574246 |
263.574246 |
如果使用Spark提供的Python Shell,同样编写Pandas加载数据,时间会短25秒左右,看来Spark对Python的内存使用都有优化。
数据清洗
Pandas提供了 DataFrame.describe 方法查看数据摘要,包括数据查看(默认共输出首尾60行数据)和行列统计。由于源数据通常包含一些空值甚至空列,会影响数据分析的时间和效率,在预览了数据摘要后,需要对这些无效数据进行处理。
首先调用 DataFrame.isnull() 方法查看数据表中哪些为空值,与它相反的方法是 DataFrame.notnull() ,Pandas会将表中所有数据进行null计算,以True/False作为结果进行填充,如下图所示:
Pandas的非空计算速度很快,9800万数据也只需要28.7秒。得到初步信息之后,可以对表中空列进行移除操作。尝试了按列名依次计算获取非空列,和 DataFrame.dropna() 两种方式,时间分别为367.0秒和345.3秒,但检查时发现 dropna() 之后所有的行都没有了,查了Pandas手册,原来不加参数的情况下, dropna() 会移除所有包含空值的行。如果只想移除全部为空值的列,需要加上 axis 和 how 两个参数:
df.dropna(axis=1, how='all')
共移除了14列中的6列,时间也只消耗了85.9秒。
接下来是处理剩余行中的空值,经过测试,在 DataFrame.replace() 中使用空字符串,要比默认的空值NaN节省一些空间;但对整个CSV文件来说,空列只是多存了一个“,”,所以移除的9800万 x 6列也只省下了200M的空间。进一步的数据清洗还是在移除无用数据和合并上。
对数据列的丢弃,除无效值和需求规定之外,一些表自身的冗余列也需要在这个环节清理,比如说表中的流水号是某两个字段拼接、类型描述等,通过对这些数据的丢弃,新的数据文件大小为4.73GB,足足减少了4.04G!
数据处理
使用 DataFrame.dtypes 可以查看每列的数据类型,Pandas默认可以读出int和float64,其它的都处理为object,需要转换格式的一般为日期时间。DataFrame.astype() 方法可对整个DataFrame或某一列进行数据格式转换,支持Python和NumPy的数据类型。
df['Name'] = df['Name'].astype(np.datetime64)
对数据聚合,我测试了 DataFrame.groupby 和 DataFrame.pivot_table 以及 pandas.merge ,groupby 9800万行 x 3列的时间为99秒,连接表为26秒,生成透视表的速度更快,仅需5秒。
df.groupby(['NO','TIME','SVID']).count() # 分组fullData = pd.merge(df, trancodeData)[['NO','SVID','TIME','CLASS','TYPE']] # 连接actions = fullData.pivot_table('SVID', columns='TYPE', aggfunc='count') # 透视表
根据透视表生成的交易/查询比例饼图:
将日志时间加入透视表并输出每天的交易/查询比例图:
total_actions = fullData.pivot_table('SVID', index='TIME', columns='TYPE', aggfunc='count')total_actions.plot(subplots=False, figsize=(18,6), kind='area')
除此之外,Pandas提供的DataFrame查询统计功能速度表现也非常优秀,7秒以内就可以查询生成所有类型为交易的数据子表:
tranData = fullData[fullData['Type'] == 'Transaction']
该子表的大小为 [10250666 rows x 5 columns]。在此已经完成了数据处理的一些基本场景。实验结果足以说明,在非“>5TB”数据的情况下,Python的表现已经能让擅长使用统计分析语言的数据分析师游刃有余。
- 多线程编程学习二(对象及变量的并发访问).
- ASM基本配置问题(r5笔记第89天)
- 如何上手使用 Facebook 的开源平台 Detectron?
- 多线程编程学习三(线程间通信).
- 关于create database语句在10g,11g中的不同(r5笔记第88天)
- Web开发模式【Mode I 和Mode II的介绍、应用案例】
- 多线程编程学习四(Lock 的使用)
- Android编程规范
- 干货 | 深入分析Object.wait/notify实现机制
- 关于ORA-01555的问题分析(r5笔记第87天)
- 项目工具类
- AJAX常见面试题
- 干货 | Tomcat类加载机制触发的Too many open files问题分析
- 并行查询缓慢的问题分析(r5笔记第86天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- MapReduce工作笔记——Hadoop Streaming多目录/多路输入
- 矩阵操作试题(C++/Python)——矩阵元素顺时针旋转
- Linux实用技巧——paste横向合并文件内容
- Julia简易教程——5_函数
- 矩阵操作试题(C++/Python)——矩阵元素逆时针旋转90度
- MapReduce工作笔记——Streaming输入input解压
- MapReduce工作笔记——Streaming输出output压缩
- Linux实用技巧——删除重复行
- python函数——os.path.join路径拼接(pjoin)
- python函数——pickle中的dump以及load
- python函数——Keras分词器Tokenizer
- python函数——序列预处理pad_sequences()序列填充
- MapReduce工作笔记——Streaming多路输出
- nuxt.js项目入门配置篇
- 高级综合工具StratusHLS学习笔记(4)