Spark高级操作之json复杂和嵌套数据结构的操作二
一,准备阶段
Json格式里面有map结构和嵌套json也是很合理的。本文将举例说明如何用spark解析包含复杂的嵌套数据结构,map。现实中的例子是,一个设备的检测事件,二氧化碳的安全你浓度,高温数据等,需要实时产生数据,然后及时的告警处理。
1,定义schema
import org.apache.spark.sql.types._
val schema = new StructType()
.add("dc_id", StringType) // data center where data was posted to Kafka cluster
.add("source", // info about the source of alarm
MapType( // define this as a Map(Key->value)
StringType,
new StructType()
.add("description", StringType)
.add("ip", StringType)
.add("id", LongType)
.add("temp", LongType)
.add("c02_level", LongType)
.add("geo",
new StructType()
.add("lat", DoubleType)
.add("long", DoubleType)
)
)
)
2,准备数据
val dataDS = Seq("""
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {"lat":35.93, "long":-85.46}
}
}
}""").toDS()
// should only be one item
dataDS.count()
3,准备处理
val df = spark.read.schema(schema).json(dataDS.rdd)
查看schema
df.printSchema
二,如何使用explode()
Explode()方法在spark1.3的时候就已经存在了,在这里展示一下如何抽取嵌套的数据结构。在一些场合,会结合explode,to_json,from_json一起使用。
Explode为给定的map的每一个元素创建一个新的行。比如上面准备的数据,source就是一个map结构。Map中的每一个key/value对都会是一个独立的行。
val explodedDF = df.select($"dc_id", explode($"source"))
explodedDF.printSchema
可以看看操作之后的schema信息
获取内部的 数据
case class DeviceAlert(dcId: String, deviceType:String, ip:String, deviceId:Long, temp:Long, c02_level: Long, lat: Double, lon: Double)
val notifydevicesDS = explodedDF.select( $"dc_id" as "dcId",
$"key" as "deviceType",
'value.getItem("ip") as 'ip,
'value.getItem("id") as 'deviceId,
'value.getItem("c02_level") as 'c02_level,
'value.getItem("temp") as 'temp,
'value.getItem("geo").getItem("lat") as 'lat, //note embedded level requires yet another level of fetching.
'value.getItem("geo").getItem("long") as 'lon)
.as[DeviceAlert] // return as a Dataset
查看schema信息
notifydevicesDS.printSchema
三,再复杂一点
在物联网场景里,通畅物联网设备会将很多json 事件数据发给他的收集器。收集器可以是附近的数据中心,也可以是附近的聚合器,也可以是安装在家里的一个设备,它会有规律的周期的将数据通过加密的互联网发给远程的数据中心。说白一点,数据格式更复杂。
我们下面会有三个map的数据格式:恒温计,摄像机,烟雾报警器。
import org.apache.spark.sql.types._
// a bit longish, nested, and convuloted JSON schema :)
val nestSchema2 = new StructType()
.add("devices",
new StructType()
.add("thermostats", MapType(StringType,
new StructType()
.add("device_id", StringType)
.add("locale", StringType)
.add("software_version", StringType)
.add("structure_id", StringType)
.add("where_name", StringType)
.add("last_connection", StringType)
.add("is_online", BooleanType)
.add("can_cool", BooleanType)
.add("can_heat", BooleanType)
.add("is_using_emergency_heat", BooleanType)
.add("has_fan", BooleanType)
.add("fan_timer_active", BooleanType)
.add("fan_timer_timeout", StringType)
.add("temperature_scale", StringType)
.add("target_temperature_f", DoubleType)
.add("target_temperature_high_f", DoubleType)
.add("target_temperature_low_f", DoubleType)
.add("eco_temperature_high_f", DoubleType)
.add("eco_temperature_low_f", DoubleType)
.add("away_temperature_high_f", DoubleType)
.add("away_temperature_low_f", DoubleType)
.add("hvac_mode", StringType)
.add("humidity", LongType)
.add("hvac_state", StringType)
.add("is_locked", StringType)
.add("locked_temp_min_f", DoubleType)
.add("locked_temp_max_f", DoubleType)))
.add("smoke_co_alarms", MapType(StringType,
new StructType()
.add("device_id", StringType)
.add("locale", StringType)
.add("software_version", StringType)
.add("structure_id", StringType)
.add("where_name", StringType)
.add("last_connection", StringType)
.add("is_online", BooleanType)
.add("battery_health", StringType)
.add("co_alarm_state", StringType)
.add("smoke_alarm_state", StringType)
.add("is_manual_test_active", BooleanType)
.add("last_manual_test_time", StringType)
.add("ui_color_state", StringType)))
.add("cameras", MapType(StringType,
new StructType()
.add("device_id", StringType)
.add("software_version", StringType)
.add("structure_id", StringType)
.add("where_name", StringType)
.add("is_online", BooleanType)
.add("is_streaming", BooleanType)
.add("is_audio_input_enabled", BooleanType)
.add("last_is_online_change", StringType)
.add("is_video_history_enabled", BooleanType)
.add("web_url", StringType)
.add("app_url", StringType)
.add("is_public_share_enabled", BooleanType)
.add("activity_zones",
new StructType()
.add("name", StringType)
.add("id", LongType))
.add("last_event", StringType))))
对应的数据
val nestDataDS2 = Seq("""{
"devices": {
"thermostats": {
"peyiJNo0IldT2YlIVtYaGQ": {
"device_id": "peyiJNo0IldT2YlIVtYaGQ",
"locale": "en-US",
"software_version": "4.0",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Hallway Upstairs",
"last_connection": "2016-10-31T23:59:59.000Z",
"is_online": true,
"can_cool": true,
"can_heat": true,
"is_using_emergency_heat": true,
"has_fan": true,
"fan_timer_active": true,
"fan_timer_timeout": "2016-10-31T23:59:59.000Z",
"temperature_scale": "F",
"target_temperature_f": 72,
"target_temperature_high_f": 80,
"target_temperature_low_f": 65,
"eco_temperature_high_f": 80,
"eco_temperature_low_f": 65,
"away_temperature_high_f": 80,
"away_temperature_low_f": 65,
"hvac_mode": "heat",
"humidity": 40,
"hvac_state": "heating",
"is_locked": true,
"locked_temp_min_f": 65,
"locked_temp_max_f": 80
}
},
"smoke_co_alarms": {
"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
"device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
"locale": "en-US",
"software_version": "1.01",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Jane's Room",
"last_connection": "2016-10-31T23:59:59.000Z",
"is_online": true,
"battery_health": "ok",
"co_alarm_state": "ok",
"smoke_alarm_state": "ok",
"is_manual_test_active": true,
"last_manual_test_time": "2016-10-31T23:59:59.000Z",
"ui_color_state": "gray"
}
},
"cameras": {
"awJo6rH0IldT2YlIVtYaGQ": {
"device_id": "awJo6rH",
"software_version": "4.0",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Foyer",
"is_online": true,
"is_streaming": true,
"is_audio_input_enabled": true,
"last_is_online_change": "2016-12-29T18:42:00.000Z",
"is_video_history_enabled": true,
"web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
"app_url": "nestmobile://cameras/device_id?auth=access_token",
"is_public_share_enabled": true,
"activity_zones": { "name": "Walkway", "id": 244083 },
"last_event": "2016-10-31T23:59:59.000Z"
}
}
}
}""").toDS
通过创建一个简单的dataset,我们可以使用所有的dataset的方法来进行ETL操作,比如from_json(), to_json(), explode() and selectExpr()。
val nestDF2 = spark // spark session
.read // get DataFrameReader
.schema(nestSchema2) // use the defined schema above and read format as JSON
.json(nestDataDS2.rdd)
2,将整个json对象,转化为一个json string
val stringJsonDF = nestDF2.select(to_json(struct($"*"))).toDF("nestDevice")
3,将三个json object 的map对象抓化为三个单独的map列,然后可以是使用explode方法访问其属性。
val mapColumnsDF = nestDF2.select($"devices".getItem("smoke_co_alarms").alias ("smoke_alarms"),
$"devices".getItem("cameras").alias ("cameras"),
$"devices".getItem("thermostats").alias ("thermostats"))
转化为三个dataframe
val explodedThermostatsDF = mapColumnsDF.select(explode($"thermostats"))
val explodedCamerasDF = mapColumnsDF.select(explode($"cameras"))
//or you could use the original nestDF2 and use the devices.X notation
val explodedSmokedAlarmsDF = nestDF2.select(explode($"devices.smoke_co_alarms"))
查看其schema
explodedThermostatsDF.printSchema
访问三个map内部的元素
val thermostateDF = explodedThermostatsDF.select($"value".getItem("device_id").alias("device_id"),
$"value".getItem("locale").alias("locale"),
$"value".getItem("where_name").alias("location"),
$"value".getItem("last_connection").alias("last_connected"),
$"value".getItem("humidity").alias("humidity"),
$"value".getItem("target_temperature_f").alias("target_temperature_f"),
$"value".getItem("hvac_mode").alias("mode"),
$"value".getItem("software_version").alias("version"))
val cameraDF = explodedCamerasDF.select($"value".getItem("device_id").alias("device_id"),
$"value".getItem("where_name").alias("location"),
$"value".getItem("software_version").alias("version"),
$"value".getItem("activity_zones").getItem("name").alias("name"),
$"value".getItem("activity_zones").getItem("id").alias("id"))
val smokedAlarmsDF = explodedSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),
$"value".getItem("where_name").alias("location"),
$"value".getItem("software_version").alias("version"),
$"value".getItem("last_connection").alias("last_connected"),
$"value".getItem("battery_health").alias("battery_health"))
查看内部数据
cameraDF.show
通过version进行join操作
val joineDFs = thermostateDF.join(cameraDF, "version")
四,总结
这篇文章的重点是介绍几个好用的工具,去获取复杂的嵌套的json数据格式。一旦你将嵌套数据扁平化之后,再进行访问,就跟普通的数据格式没啥区别了。
- Morris图表使用小记
- nginx域名访问的白名单配置梳理
- 老生常谈:利用Membership实现SSO(单点登录)
- nginx利用geo模块做限速白名单以及geo实现全局负载均衡的操作记录
- Mysql高效插入/更新数据
- 宋小菜融资2.3亿元!域名保护意识强
- 世界最奇葩的7款机器人
- Mysql高效插入/更新数据
- 关于Membership/Role您可能不知道的细节
- Sqlite向MySql导入数据
- 未来3年,人工智能如何影响法律行业?5位权威专家给出趋势
- Java 常见内存溢出异常与代码实现
- nginx限制上传大小和超时时间设置说明/php限制上传大小
- Unity Application Block 1.2 学习笔记
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- (八)python3 只需3小时带你轻松入门——List 与 dict 的常用操作
- (九)python3 只需3小时带你轻松入门——函数自定义
- (十)python3 只需3小时带你轻松入门——模块与包
- (十一)python3 只需3小时带你轻松入门——面向对象
- 一文读懂KEGG数据库
- (创建模式 上)设计模式——工厂、抽象工厂 C++/Python3实现
- 【新手宝典】一篇博文带萌新建站并了解建站体系流程和对萌新友好的便捷方式,这篇博文很有可能是你的启蒙文
- 一种不需要敲代码的Python 画图方法
- 【一】Windows API 零门槛编程指南——MessageBox 基本使用及基础讲解
- 【二】Windows API 零门槛编程指南——CreateWindow 窗口创建 “万字长篇专业术语全解”
- 「零门槛多语言 Python/C/C# 通用思想学习系列」第一篇:经典HelloWorld
- 直播系统定制,判断数据连接是否可用
- VS Code 编辑器入门指南上篇-核心概念与组件
- Python turtle库实现基本剖析
- python thinker canvas create_arc 使用详解