PySpark分析二进制文件
客户希望通过spark来分析二进制文件中0和1的数量以及占比。如果要分析的是目录,则针对目录下的每个文件单独进行分析。分析后的结果保存与被分析文件同名的日志文件中,内容包括0和1字符的数量与占比。
要求:如果值换算为二进制不足八位,则需要在左侧填充0。
可以在linux下查看二进制文件的内容。命令:
xxd –b –c 1 filename
命令参数-c 1是显示1列1个字符,-b是显示二进制。
遇到的坑
开发环境的问题
要在spark下使用python,需要事先使用pip安装pyspark。结果安装总是失败。python的第三方库地址是https://pypi.python.org/simple/,在国内访问很慢。通过搜索问题,许多文章提到了国内的镜像库,例如豆瓣的库,结果安装时都提示找不到pyspark。
查看安装错误原因,并非不能访问该库,仅仅是访问较慢,下载了不到8%的时候就提示下载失败。这实际上是连接超时的原因。因而可以修改连接超时值。可以在~/.pip/pip.conf下增加:
[global]timeout = 6000
虽然安装依然缓慢,但至少能保证pyspark安装完毕。但是在安装py4j时,又提示如下错误信息(安装环境为mac):
OSError: [Errno 1] Operation not permitted: '/System/Library/Frameworks/Python.framework/Versions/2.7/share'
即使这个安装方式是采用sudo,且在管理员身份下安装,仍然提示该错误。解决办法是执行如下安装:
pip install --upgrade pip sudo pip install numpy --upgrade --ignore-installed sudo pip install scipy --upgrade --ignore-installed sudo pip install scikit-learn --upgrade --ignore-installed
然后再重新执行sudo pip install pyspark,安装正确。
字符编码的坑
在提示信息以及最后分析的结果中都包含了中文。运行代码时,会提示如下错误信息:
SyntaxError: Non-ASCII character 'xe5' in file /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py on line 36, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details
需要在代码文件的首行添加如下编码声明: # This Python file uses the following encoding: utf-8
SparkConf的坑
初始化SparkContext的代码如下所示:
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf)
结果报告运行错误:
Error initializing SparkContext.
org.apache.spark.SparkException: Could not parse Master URL: <pyspark.conf.SparkConf object at 0x106666390>
根据错误提示,以为是Master的设置有问题,实际上是实例化SparkContext有问题。阅读代码,发现它的构造函数声明如下所示:
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None, jsc=None, profiler_cls=BasicProfiler):
而前面的代码仅仅是简单的将conf传递给SparkContext构造函数,这就会导致Spark会将conf看做是master参数的值,即默认为第一个参数。所以这里要带名参数:
sc = SparkContext(conf = conf)
sys.argv的坑
我需要在使用spark-submit命令执行python脚本文件时,传入我需要分析的文件路径。与scala和java不同。scala的main函数参数argv实际上可以接受命令行传来的参数。python不能这样,只能使用sys模块来接收命令行参数,即sys.argv。
argv是一个list类型,当我们通过sys.argv获取传递进来的参数值时,一定要明白它会默认将spark-submit后要执行的python脚本文件路径作为第一个参数,而之后的参数则放在第二个。例如命令如下:
./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"
则:
- argv[0]: /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py
- argv[1]: files
因此,我需要获得files文件夹名,就应该通过argv[1]来获得。
此外,由于argv是一个list,没有size属性,而应该通过len()方法来获得它的长度,且期待的长度为2。
整数参与除法的坑
在python 2.7中,如果直接对整数执行除法,结果为去掉小数。因此4 / 5得到的结果却是0。在python 3中,这种运算会自动转型为浮点型。
要解决这个问题,最简单的办法是导入一个现成的模块:
from __future__ import division
注意:这个import的声明应该放在所有import声明前面。
附整个代码:
# This Python file uses the following encoding: utf-8
from __future__ import division
import os
import time
import sys
from pyspark import SparkConf, SparkContext
APP_NAME = "Load Bin Files"
def main(spark_context, path):
file_paths = fetch_files(path)
for file_path in file_paths:
outputs = analysis_file_content(spark_context, path + "/" + file_path)
print_outputs(outputs)
save_outputs(file_path, outputs)
def fetch_files(path):
if os.path.isfile(path):
return [path]
return os.listdir(path)
def analysis_file_content(spark_context, file_path):
data = spark_context.binaryRecords(file_path, 1)
records = data.flatMap(lambda d: list(bin(ord(d)).replace('0b', '').zfill(8)))
mapped_with_key = records.map(lambda d: ('0', 1) if d == '0' else ('1', 1))
result = mapped_with_key.reduceByKey(lambda x, y: x + y)
total = result.map(lambda r: r[1]).sum()
return result.map(lambda r: format_outputs(r, total)).collect()
def format_outputs(value_with_key, total):
tu = (value_with_key[0], value_with_key[1], value_with_key[1] / total * 100)
return "字符{0}的数量为{1}, 占比为{2:.2f}%".format(*tu)
def print_outputs(outputs):
for output in outputs:
print output
def save_outputs(file_path, outputs):
result_dir = "result"
if not os.path.exists(result_dir):
os.mkdir(result_dir)
output_file_name = "result/" + file_name_with_extension(file_path) + ".output"
with open(output_file_name, "a") as result_file:
for output in outputs:
result_file.write(output + "n")
result_file.write("统计于{0}nn".format(format_logging_time()))
def format_logging_time():
return time.strftime('%Y-%m-%d %H:%m:%s', time.localtime(time.time()))
def file_name_with_extension(path):
last_index = path.rfind("/") + 1
length = len(path)
return path[last_index:length]
if __name__ == "__main__":
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)
if len(sys.argv) != 2:
print("请输入正确的文件或目录路径")
else:
main(sc, sys.argv[1])
实现并不复杂,只是自己对Python不太熟悉,也从未用过PySpark,所以蹚了不少坑,所幸都不复杂,通过google都找到了解决方案。是为记!
- 生物信息学技能面试题(第4题)-多个同样的行列式文件合并起来
- PHP 底层的运行机制与原理
- asp.net web api 版本控制
- 如何编写更好的SQL查询:终极指南(上)
- asp.net web api 异常捕获
- asp.net web api 文件上传
- 使用MySQL正则表达式 __MySQL必知必会
- 史上最好用的免费翻蔷利器
- asp.net web api 接口安全与角色控制
- TensorFlow从0到1 | 第十五章 重新思考神经网络初始化
- asp.net web api 下载之断点续传
- apache2.4.x三种MPM介绍
- 没有自己的服务器如何学习生物数据分析(上篇)
- 【直播】我的基因组57:最简陋的祖源分析
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 在不影响程序使用的情况下添加shellcode
- [K8s 1.9实践]Kubeadm 1.9 HA 高可用 集群 本地离线镜像部署
- ansible模块command、shell、raw、script
- systemd - CentOS 7进程守护&监控
- Java 8的新特性还不了解?快进来!
- 【Vulnhub】Play XML Entities
- 一切皆是映射:詳解 Kotlin Map 集合類
- 10大高性能开发宝石,我要消灭一半程序员!
- 面试官:你说你会RabbitMQ,那聊聊它的交换机(Exchange)吧
- Kubeadm 1.9 HA 高可用集群本地离线镜像部署【已验证】
- kubernetes(k8s)集群安装calico
- kubernetes(k8s) Prometheus+grafana监控告警安装部署
- 基于OpencvCV的情绪检测
- 设计模式 之 抽象工厂模式
- Angular应用里HTTP请求的错误处理