Spark之RDD与DataFrame互相转换
时间:2019-01-17
本文章向大家介绍Spark之RDD与DataFrame互相转换,主要包括Spark之RDD与DataFrame互相转换使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在Spark中RDD转换为DataFrame一共有两种方法。一种是基于一个封装数据的类去转换,另一种是通过结构类型去转换,下面分别介绍。
基于封装数据的类去转换(反射方式)
Java版本
这种方法是底层基于类的反射机制去实现的,为了封装数据我们首先需要一个封装数据的类也就是JavaBean。然后去构造一个JavaRDD<JavaBean>,然后调用构造RDD的方法去构造DataFrame,这时需要传入构造好的RDD与JavaBean.class进而得到一个Dataset<Row>(注意在Java中Dataset也就是DataFrame)。下面做一个展示:
我们所用到的数据:
1,leo,17 2,marry,17 3,jack,18 4,tom,19 |
下面为Java代码:
package test.scala.com.zhuliu7.sparksqlnext_java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class RDD2DataFrameReflection {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("dfuse").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> oraginRDD = sc.textFile("E:\\testdata\\students.txt");
/*
* 在Java中如果我们知道RDD中的数据对应着一个JavaBean,这时要将RDD转换DF的话首先需要一个JavaBean类型的RDD
* */
JavaRDD<Student> stuRDD = oraginRDD.map(new Function<String, Student>() {
@Override
public Student call(String s) throws Exception {
String[] strs = s.split(",");
Student stu = new Student();
stu.setId(Integer.valueOf(strs[0]));
stu.setName(strs[1]);
stu.setAge(Integer.valueOf(strs[2]));
return stu;
}
});
// 使用反射的方式来创建DataFrame,包含的数据类型只能是简单类型,这句代码是核心
Dataset<Row> ds = sqlContext.createDataFrame(stuRDD,Student.class);
ds.show();
ds.registerTempTable("t_stu");
Dataset nds = sqlContext.sql("select * from t_stu where age <= 18");
// 将查询出来的结果再次封装到RDD中
JavaRDD<Row> finalRDD = nds.javaRDD();
// 将数据映射为Student类型
JavaRDD<Student> fRDD = finalRDD.map(new Function<Row, Student>() {
@Override
public Student call(Row row) throws Exception {
Student stu = new Student();
stu.setAge(row.getInt(0));
stu.setName(row.getString(2));
stu.setId(row.getInt(1));
return stu;
}
});
fRDD.foreach(new VoidFunction<Student>() {
@Override
public void call(Student student) throws Exception {
System.out.println(student);
}
});
}
}
从上面我们也能够看到,想要从DataFrame还原为以前的JavaRDD<JavaBean>只需要将DataFrame转换为JavaRDD<Row>再做一次映射进而转换为JavaRDD<JavaBean>就ok了。
Scala版本
与Java版本的思路一样只不过是把JavaBean替换成了case class。代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object RDD2DataFrameReflaction {
case class Student(id : Int, name : String,age : Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DFUse").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // 放在sqlContext之后
val preRDD = sc.textFile("E:\\testdata\\students.txt").map(x => x.split(",")).map(arr => Student(arr(0).toInt,arr(1).toString,arr(2).toInt))
val df = preRDD.toDF() // 这句话编译通过需要import sqlContext.implicits._的支持,而这句话能够正确运行需要将case class放在成员变量的位置
df.show()
// 接下来将df中查询的结果在还原为RDD
df.registerTempTable("t_sdu")
val fdf = sqlContext.sql("select * from t_stu where age > 17")
val tranRDD = fdf.rdd // 这时RDD中的元素都为Row
val finalRDD = tranRDD.map(row => Student(row(0).toString.toInt,row(1).toString,row(1).toString.toInt)) //这个顺序可以自定义
val fRDD = tranRDD.map(row => Student(row.getInt(0),row.getString(1),row.getInt(2)))
}
}
使用结构类型StructType去转换
Java版本
首先去构造一个JavaRDD<Row>,这里要注意Row中的元素数据类型要和我们的即将构造的结构类型中的类型要一一对应。接着就要去构造StructType类型,最后使用SQLContext配合JavaRDD与StructType去创建DataFrame,代码如下:
package test.scala.com.zhuliu7.sparksqlnext_java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class Programme2DF {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("dfuse").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> oraginRDD = sc.textFile("E:\\testdata\\students.txt");
JavaRDD<Row> rowJavaRDD = oraginRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
String[] strs = s.split(",");
return RowFactory.create(
Integer.valueOf(strs[0]),
strs[1],
Integer.valueOf(strs[2])
);
}
});
// 定义StructType结构类型
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("name",DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType type = DataTypes.createStructType(fields);
Dataset ds = sqlContext.createDataFrame(rowJavaRDD,type);
ds.show();
}
}
Scala版本
与Java的思路相似,构造RDD<Row>,StructType进而构建DataFrame。
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
object Programme2DF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DFUse").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // 放在sqlContext之后
val rowRDD = sc.textFile("E:\\testdata\\students.txt").map(x => Row(x.split(",")(0).toInt,x.split(",")(1),x.split(",")(2).toInt))
// 构造StructType
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
/*
* 另一种写法
* StructType(StructField("id",IntegerType) :: Nil)
* */
val df = sqlContext.createDataFrame(rowRDD, structType)
df.show()
}
}
最后感谢石杉老师!
- WordPress 免插件仅代码实现 Gravatar 头像缓存
- Spring JDBCTemplate使用JNDI数据源
- 大家之前是不是误解了DC/OS与Kubernetes之间的关系
- CentOS 7 上部署Mono 4 和Jexus 5.6
- maven学习(下)利用Profile构建不同环境的部署包
- AS3初学者容易迷糊的几个问题
- Spring Security笔记:自定义Login/Logout Filter、AuthenticationProvider、AuthenticationToken
- ASP.NET Web API 2.1支持Binary JSON(Bson)
- Spring Security笔记:自定义登录页
- 浅析 Linux 初始化 init 系统
- 如何提高Python运行效率 超实用的四种提速方法
- 如何让oracle的select强制走索引
- ActionScript3.0(AS3)中的泛型数组Vector
- 人民网评:“算法推荐”不能成为传播低俗信息的助推器
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 为何IntelliJ IDEA比Eclipse好在哪里?
- 五分钟C语言数据结构 之 二叉树中序遍历
- Django 安全之跨站点请求伪造(CSRF)保护
- 五分钟C语言数据结构 之 二叉树先序遍历
- Java 语言基础(常用设计原则和设计模式,常用 Java 8~11 新特性)
- 五分钟C语言数据结构 之 二叉树后序遍历(非递归很重要)
- 5分钟Flink - 自定义Source源
- 9.深入k8s:调度器及其源码分析
- 5分钟Flink - 自定义Data Sink
- 5分钟Flink - 流处理API转换算子集合
- 视频上云/网络穿透/网络映射服务EasyNTS前端组织添加页面出现Vue冲突怎么解决?
- Pinpoint 一款强大的APM工具
- 1. Pandas系列 - 基本数据结构
- 6 年前,只会 JSP 和 Servlet 就可以找到工作
- Python文件处理实用指南