大数据ETL实践探索(6)---- 使用python将大数据对象写回本地磁盘的几种方案
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wangyaninglm/article/details/88902294
文章大纲
1. python 与hdfs 交互 回写
1.1 使用hdfs 包
api list:https://hdfscli.readthedocs.io/en/latest/api.html#api-reference
获取hdfs data 文件夹下面所有csv 文件
from hdfs.client import Client
client = Client("http://IP:50070") # 50070: Hadoop默认namenode
#返回目录下的文件
def list_file(client,hdfs_path):
return client.list(hdfs_path, status=False)
#从hdfs获取文件到本地
def get_from_hdfs(client,hdfs_path,local_path):
client.download(hdfs_path, local_path, overwrite=False,n_threads=1)
hdfs_path = r'/user/hadoop/data'
name_list = list_file(client,hdfs_path)
#过滤所有csv文件
name_list_csv = [n for n in name_list if '.csv' in n]
print(name_list)
index = 1
for file in name_list_csv:
get_from_hdfs(client,hdfs_path+'/'+file,os.path.join( os.getcwd()))
os.rename(os.path.join(os.getcwd(),file),os.path.join(os.getcwd(),str(index)+".csv"))
index = index+1
1.2 python2 与hdfs
python2 与hdfs交互的一些老方法可以参考这个博文 https://www.cnblogs.com/liyongsan/p/4987819.html
1.3 在python中直接调用hadoop shell 命令去操作文件
1.3.1 hadoop shell
写也可以先saveAsTextFile,然后使用hdfs命令存到本地, 使用hdfs fs -get命令:
${HADOOP_COMMON_HOME}/bin/hadoop fs -get /hdfspath/to/data.txt /localpath/to/data.txt
或者,使用hdfs fs -copyToLocal命令:
${HADOOP_COMMON_HOME}/bin/hadoop fs -copyToLocal /hdfspath/to/data.txt /localpath/to/data.txt
1.3.2 popen
使用popen 可以获取命令执行的返回值
os.popen(r'hadoop dfs -ls /user/').read()
1.3.3 subprocess
https://docs.python.org/2/library/subprocess.html 该子模块允许你创建新的流程,连接到它们的输入/输出/错误管道,并获取他们的返回值。该模块打算替换多个旧的模块和功能:os.system 和 os.spawn * 使用subprocess时建议使用run()函数去处理所有它可以处理的情况,因为高级用法可以直接使用底层POPEN接口。 run()函数是Python 3.5中新添加的。
cat = subprocess.Popen(["hadoop", "fs", "-ls", "/user/hadoop/my_data"], stdout=subprocess.PIPE)
for line in cat.stdout:
print (line)
输出:
b’Found 2 itemsn’ b’-rw-r–r-- 2 hadoop hadoop 0 2019-03-28 08:38 /user/hadoop/my_data/_SUCCESSn’ b’-rw-r–r-- 2 hadoop hadoop 6967144 2019-03-28 08:38 /user/hadoop/my_data/part-00000-9431d082-957d-4a0b-a3ae-4ffa4674c70e-c000.csvn’
1.4 python 与 py4j 交互
http://aducode.github.io/posts/2016-08-02/write2hdfsinpyspark.html python中调用java对象来操作hdfs文件
def path(sc, filepath):
"""
创建hadoop path对象
:param sc sparkContext对象
:param filename 文件绝对路径
:return org.apache.hadoop.fs.Path对象
"""
path_class = sc._gateway.jvm.org.apache.hadoop.fs.path
return path_class(filepath)
def get_file_system(sc):
"""
创建FileSystem
:param sc SparkContext
:return FileSystem对象
"""
filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FilFileSystem
hadoop_configuration = sc._jsc.hadoopConfiguration()
return filesystem_class.get(hadoop_configuration)
def write(sc, filepath, content, overwite=True):
"""
写内容到hdfs文件
:param sc SparkContext
:param filepath 绝对路径
:param content 文件内容
:param overwrite 是否覆盖
"""
filesystem = get_file_system(sc)
out = filesystem.create(path(sc, filepath), overwrite)
out.write(bytearray(content, "utf-8"))
out.flush()
out.close()
write(sc, '/user/hadoop/my_data/ll.txt', 'shenmemgui', overwite=True)
2. pyspark 与driver 磁盘交互
直接写文件到磁盘(这个可以搭建一个本地的spark 单机版试试)
2.0版本后http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/readwriter.html#DataFrameWriter.csv 对象引入的新方法
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None)
##e.g.
import os
import tempfile
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
#或者
df.repartition(1).write.csv(path="file:///"+(os.path.join( os.getcwd(),'test')), mode="overwrite", header="true")
用的时候这么用,我还以为os 都出来这个坨坨移到driver 的本地文件上了,结果还是在hdfs 的文件系统中。
这个函数说明中有一句 path – the path in any Hadoop supported file system 我想如果可行的话还是先写到hdfs 再挪回本地吧
mode="overwrite"慎用,我就直接把当前目录里面notebook 一些代码给覆盖了,结果找到找不回来,痛心。或者可以将dataframe 转化成rdd 后用saveAsTextFile 写回本地磁盘。
综上所述,我认为还是先写到hdfs 上或者s3上面比较安全,然后通过命令合并好文件再保存到本地。
3. python docker 搭建spark standalone 版本
https://www.cnblogs.com/hongdada/p/9475406.html
docker search spark
docker pull sequenceiq/spark
# 结果发现上面版本中的spark 是1.X 的
docker search spark2.0
#随便下一个
#机器上的其他容器先关了
docker stop $(docker ps -aq)
docker run -dit -p 8088:8088 -p 8042:8042 -p 4040:4040 -h sandbox sequenceiq/spark
docker image ls
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- php基础教程 第六步 学习数组以及条件判断switch补充
- php基础教程 第七步数组补充及循环基础
- php基础教程 第八步循环补充
- php基础教程 第九步 自定义函数
- php基础教程 第十步 阶段性知识补充
- php基础教程 第十一步 面向对象
- php基础教程 第十一步 面向对象补充
- Serverless|Framework——图文玩转 AWS Lambda
- C++入门指南及实战 第一步 概述及经典HelloWorld
- C++入门指南及实战 第二步 HelloWorld及扩展详解
- 依托于GitLab持续集成基础配置和使用
- C++入门指南及实战 第三步 基本变量
- 最全总结 | 聊聊 Python 数据处理全家桶(配置篇)
- FlexSDK工具包的介绍与编译使用
- 《零基础看得懂的C语言入门教程 》——(二)C语言没那么难简单开发带你了解流程