Spark(RDD,CSV)创建DataFrame方式
时间:2022-07-22
本文章向大家介绍Spark(RDD,CSV)创建DataFrame方式,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
spark将RDD转换为DataFrame
- 方法一(不推荐)
spark将csv转换为DataFrame,可以先文件读取为RDD,然后再进行map操作,对每一行进行分割。 再将schema和rdd分割后的Rows回填,sparkSession创建的dataFrame
val spark = SparkSession
.builder()
.appName("sparkdf")
.master("local[1]")
.getOrCreate()
//设置spark的上下文sparkContext
val sc = spark.sparkContext
val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
//val rdd = fileRDD.filter(line => line.split("t").length != 30)
val df = spark.createDataFrame(fileRDD.map(line=>HttpSchema.parseLog(line)),HttpSchema.struct)
df.show(3)
这里的RDD是通过读取文件创建的所以也可以看做是将RDD转换为DataFrame
object HttpSchema {
def parseLog(x:String): Row = {
var fields = x.split("t")
val _id = fields(0)
val srcIp = fields(1)
val srcPort = fields(2)
//这种方法比较麻烦的地方是row里面的字段名要和struct中的字段对应上
RowFactory.create(_id,srcIp,srcPort)
}
//设置schema描述
val struct = StructType(
Array( StructField("_id",StringType),
StructField("srcIp",StringType),
StructField("srcPort",StringType),
)
)
}
这也是这种方法不推荐使用的地方,因为返回的Row中的字段名要与schema中的字段名要一致,当字段多于22个这个需要集成一个
2.方法二 //使用隐式转换的方式来进行转换
val spark = SparkSession
.builder()
.appName("sparkdf")
.master("local[1]")
.getOrCreate()
//使用隐式转换必须导入这个才可以使用只有import spark.implicits._之后,RDD才有toDF、toDS功能
import spark.implicits._
//设置spark的上下文sparkContext
val sc = spark.sparkContext
val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
case class HttpClass(id:String,srcIp:String,srcPort:Int)
val df = fileRDD.map(_.split("t")).map(line=>HttpClass(line(0),line(1),line(2).toInt)).toDF()
当然也可以不创建类对象
rdd.map{x=>val par=x.split(",");(par(0),par(1).toInt)}.toDF("name","age")
dataFrame转换为RDD只需要将collect就好,df.collect RDD[row]类型,就可以按row取出
spark读取csv转化为DataFrame
- 方法一
val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
val sc = new SparkContext(conf)
println("spark version: " + sc.version)
val spark = new SQLContext(sc)
import spark.implicits._
val df = spark.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("inferSchema", "false") //是否自动推到内容的类型
.option("delimiter",",") //分隔符,默认为 ,
.load("/home/hadoop/Downloads/Salary_Data.csv")
df.show()
//进行写数据
data.repartition(1).write.format("com.databricks.spark.csv")
.option("header", "false")//在csv第一行有属性"true",没有就是"false"
.option("delimiter",",")//默认以","分割
.save(outpath)
sparkContext.stop()
sparkContext.sql()操作完成后直接返回的是DataFrame 当然可以间接采用将csv直接转换为RDD然后再将RDD转换为DataFrame
2.方法二
// 读取数据并分割每个样本点的属性值 形成一个Array[String]类型的RDD
val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data").map(_.split("\s+"))
// 将rdd转换成LabeledPoint类型的RDD
val LabeledPointRdd = rdd.map(x=>LabeledPoint(0,Vectors.dense(x.map(_.toDouble))))
// 转成DataFrame并只取"features"列
val data = spark.createDataFrame(LabeledPointRdd).select("features")
- RGW 的GC深入解析与调优
- 大数据查询——HBase读写设计与实践
- 基于Logstash的自动化运维系统实现
- 简谈RGW的index shard计算
- Blackpearl 的 Impersonate
- SparkMLLib中基于DataFrame的TF-IDF
- 运用Python实现WordPress网站大规模自动化发布文章
- 基于java的中文分词工具ANSJ
- 基于DF的Tokenizer分词
- 基于pyenv和virtualenv搭建python多版本虚拟环境
- 基于DataFrame的StopWordsRemover处理
- 案例:Spark基于用户的协同过滤算法
- 请别再问我Spark的MLlib和ML库的区别
- Spark的Ml pipeline
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Java Stream中map和flatMap方法
- 关于Apache shiro实现一个账户同一时刻只有一个人登录(shiro 单点登录)
- 视频上云安防视频云服务平台EasyCVR通过海康sdk协议接入多路设备拉流时出现部分设备拉流不稳定是什么原因?
- Centos6.x服务器配置jdk+tomcat+mysql环境(jsp+mysql)
- Linux的压缩和解压缩的方法总结
- RTSP/GB28181/HIKSDK/Ehome协议视频融合平台EasyCVR编译C++报参见“XXX”的声明错误
- 详解 Linux 常用目录的作用
- CentOS6环境下搭建路由器的方法
- centos7下NFS使用与配置的步骤
- 基于DOM4J的XML文件解析类
- Win7安装和配置Apache2.4服务器的详细方法
- shiro会话管理示例代码
- Windows Apache2.4 VC9(ApacheHaus)详细安装配置教程
- 在centos 7中安装配置k8s集群的步骤详解
- Centos7.2 编译安装方式搭建 phpMyAdmin