Spark Structured Streaming的高效处理-RunOnceTrigger

时间:2022-04-25
本文章向大家介绍Spark Structured Streaming的高效处理-RunOnceTrigger,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

传统意义上,当人们想到流处理时,诸如”实时”,”24*7”或者”always on”之类的词语就会浮现在脑海中。生产中可能会遇到这种情况,数据仅仅会在固定间隔到达,比如每小时,或者每天。对于这些情况,对这些数据进行增量处理仍然是有益的。但是在集群中运行一个24*7的Streaming job就显得有些浪费了,这时候仅仅需要每天进行少量的处理即可受益。

幸运的是,在spark 2.2版本中通过使用 Structured Streaming的Run Once trigger特性,可获得Catalyst Optimizer带来的好处和集群运行空闲job带来的成本节约,这两方面好处。

一,Structured Streaming的Triggers

在Structured Streaming中,Trigger用来指定Streaming 查询产生结果的频率。一旦Trigger触发,Spark将会检查是否有新数据可用。如果有新数据,查询将增量的从上次触发的地方执行。如果没有新数据,Stream继续睡眠,直到下次Trigger触发。

Structured Streaming的默认行为尽可能低延迟地运行,trigger会在上次trigger触发结束之后立即运行。针对一些有低延迟要求的使用案例,Structured Streaming支持ProcessingTime trigger,也即将会用用户提供的时间间隔,例如每分钟,去触发一次查询。

这虽然很好,但是也免不了24*7运行。相反,RunOnce Trigger仅仅会执行一次查询,然后停止查询。

Trigger在你启动Streams的时候指定。

import org.apache.spark.sql.streaming.Trigger

 // Load your Streaming DataFrame
 val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
 sdf.writeStream.trigger(Trigger.Once).format("parquet").start("/out/path")

二,RunOnce相比Batch高效之处

1,Bookkeeping

当运行一个执行增量更新的批处理作业时,通常要处理哪些数据是更新的,哪些是该处理的,哪些是不该处理的。Structured Streaming已经为你做好了这一切,在处理一般流式应用程序时,你应该只关心业务逻辑,而不是低级的Bookkeeping。

2,表级原子性

大数据处理引擎,最重要的性质是它如何容忍失误和失败。ETL作业可能(实际上常会)失败。如果,你的工作失败了,那么你需要确保你的工作产出被清理干净,否则在你的下一次成功的工作之后你会得到重复的或者垃圾的数据。使用Structured Streaming编写基于文件的表时,Structured Streaming将每个作业创建的所有文件在每次成功的出发后提交到log中。当Spark重新读取表时,会通过log来识别哪些文件是有效的。这样可以确保因失败引入的垃圾不会被下游的应用程序所消费。

3,夸runs的状态操作

如果,你的数据流有可能产生重复的记录,但是你要实现一次语义,如何在batch处理中来实现呢?通过Structured Streaming,可以使用dropDuplicates()来实现去重。配置watermark足够长,包含若干Streaming job的runs,可以保证你不会夸runs处理到重复的数据。

4,成本节约

运行一个24*7的Streamingjob很浪费。可能有些情况,数据计算有些延迟是可以接受的,或者数据本身就会以每小时或者每天为周期产生。为了获得Structured Streaming所有上述描述的好处,你可能会人为需要一直占用集群运行程序,但是现在,使用仅执行一次的Trigger,就可以不必要一直占用集群了。

三,总结

在这篇文章中,引入了,使用Structured Streaming获取的仅执行一次的Trigger。虽然执行一此Trigger类似于运行一个批处理的job,但我们讨论了它在批处理作业方法之上的所有优点,特别是:

1,管理所有处理数据的bookkeeping

2,提供基于文件的表级别的原子ETL操作。

3,确保夸Run操作,可以轻松去重。

4,可以节省成本。通过避免运行没必要24*7运行的流处理。

跑Spark Streaming还是跑Structured Streaming,全在你一念之间。

(此处少了一个Job Scheduler,你留意到了么?)