Spark Structured Streaming的高效处理-RunOnceTrigger
传统意义上,当人们想到流处理时,诸如”实时”,”24*7”或者”always on”之类的词语就会浮现在脑海中。生产中可能会遇到这种情况,数据仅仅会在固定间隔到达,比如每小时,或者每天。对于这些情况,对这些数据进行增量处理仍然是有益的。但是在集群中运行一个24*7的Streaming job就显得有些浪费了,这时候仅仅需要每天进行少量的处理即可受益。
幸运的是,在spark 2.2版本中通过使用 Structured Streaming的Run Once trigger特性,可获得Catalyst Optimizer带来的好处和集群运行空闲job带来的成本节约,这两方面好处。
一,Structured Streaming的Triggers
在Structured Streaming中,Trigger用来指定Streaming 查询产生结果的频率。一旦Trigger触发,Spark将会检查是否有新数据可用。如果有新数据,查询将增量的从上次触发的地方执行。如果没有新数据,Stream继续睡眠,直到下次Trigger触发。
Structured Streaming的默认行为尽可能低延迟地运行,trigger会在上次trigger触发结束之后立即运行。针对一些有低延迟要求的使用案例,Structured Streaming支持ProcessingTime trigger,也即将会用用户提供的时间间隔,例如每分钟,去触发一次查询。
这虽然很好,但是也免不了24*7运行。相反,RunOnce Trigger仅仅会执行一次查询,然后停止查询。
Trigger在你启动Streams的时候指定。
import org.apache.spark.sql.streaming.Trigger
// Load your Streaming DataFrame
val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
sdf.writeStream.trigger(Trigger.Once).format("parquet").start("/out/path")
二,RunOnce相比Batch高效之处
1,Bookkeeping
当运行一个执行增量更新的批处理作业时,通常要处理哪些数据是更新的,哪些是该处理的,哪些是不该处理的。Structured Streaming已经为你做好了这一切,在处理一般流式应用程序时,你应该只关心业务逻辑,而不是低级的Bookkeeping。
2,表级原子性
大数据处理引擎,最重要的性质是它如何容忍失误和失败。ETL作业可能(实际上常会)失败。如果,你的工作失败了,那么你需要确保你的工作产出被清理干净,否则在你的下一次成功的工作之后你会得到重复的或者垃圾的数据。使用Structured Streaming编写基于文件的表时,Structured Streaming将每个作业创建的所有文件在每次成功的出发后提交到log中。当Spark重新读取表时,会通过log来识别哪些文件是有效的。这样可以确保因失败引入的垃圾不会被下游的应用程序所消费。
3,夸runs的状态操作
如果,你的数据流有可能产生重复的记录,但是你要实现一次语义,如何在batch处理中来实现呢?通过Structured Streaming,可以使用dropDuplicates()来实现去重。配置watermark足够长,包含若干Streaming job的runs,可以保证你不会夸runs处理到重复的数据。
4,成本节约
运行一个24*7的Streamingjob很浪费。可能有些情况,数据计算有些延迟是可以接受的,或者数据本身就会以每小时或者每天为周期产生。为了获得Structured Streaming所有上述描述的好处,你可能会人为需要一直占用集群运行程序,但是现在,使用仅执行一次的Trigger,就可以不必要一直占用集群了。
三,总结
在这篇文章中,引入了,使用Structured Streaming获取的仅执行一次的Trigger。虽然执行一此Trigger类似于运行一个批处理的job,但我们讨论了它在批处理作业方法之上的所有优点,特别是:
1,管理所有处理数据的bookkeeping
2,提供基于文件的表级别的原子ETL操作。
3,确保夸Run操作,可以轻松去重。
4,可以节省成本。通过避免运行没必要24*7运行的流处理。
跑Spark Streaming还是跑Structured Streaming,全在你一念之间。
(此处少了一个Job Scheduler,你留意到了么?)
- CentOs7.3 安装 JDK1.8
- 基础篇章:关于 React Native 之 ListView 组件的讲解
- maven环境快速搭建
- CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务
- CentOs7.3 搭建 Redis-4.0.1 Cluster 集群服务
- CentOs7.3 搭建 Redis-4.0.1 单机服务
- Shodan新手入坑指南
- 我用过的——Spring定时任务的几种用法
- CentOs7.3 搭建 SolrCloud 集群服务
- CentOs7.3 搭建 Solr单机服务
- CentOs7.3 搭建 ZooKeeper-3.4.9 Cluster 集群服务
- CentOs7.3 ssh 免密登录
- 基础篇章:关于 React Native 之 Touchable 系列组件的讲解
- 基础篇章:关于 React Native 之 Navigator 组件的讲解
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- [AWR报告]Latch Hit %
- [Python运维]Python3.6的安装
- [Python运维]cx_Oracle模块的安装
- C#中抽象类与抽象方法的作用与实例
- C++ 基础扫盲(1)
- [Python运维]使用cx_Oracle连接Oracle(高级篇)
- [Python运维]使用Python发送邮件
- [Python运维]自动化监控Oracle表空间并发送报警
- [Python运维]自动化监控多个Oracle表空间
- c# 动态生成控件
- C# 10分钟完成百度人脸识别——入门篇
- C#开启线程的四种方式
- C++入门实例:创建工程、opencv引用及简单实例
- C#实例:datagridview单元格合并
- C# 封装实例