【转载】【必会】SparkStreaming的窗口操作及实战
时间:2021-07-21
本文章向大家介绍【转载】【必会】SparkStreaming的窗口操作及实战,主要包括【转载】【必会】SparkStreaming的窗口操作及实战使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
下面,通过一张图来描述SparkStreaming的窗口操作,如图所示。
基于窗口的操作需要两个参数,如下:
-
窗口长度(windowDuration),控制每次计算最近的多少个批次的数据;
-
滑动间隔(slideDuration),用来控制对新的 DStream 进行计算的间隔。
两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍。
使用窗口操作,即使用窗口操作进行实战。
每秒发送1个数字
package cn.lagou.streaming
import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
object SocketLikeNCWithWindow {
def main(args: Array[String]): Unit = {
val port = 1521
val ss = new ServerSocket(port)
val socket: Socket = ss.accept()
println("connect to host : " + socket.getInetAddress)
var i = 0
// 每秒发送1个数
while(true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
案例一
观察窗口的数据;观察 batchDuration、windowDuration、slideDuration 三者之间的关系;使用窗口相关的操作,具体代码演示如下:
package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
// 每 5s 生成一个RDD(mini-batch)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 1521)
lines.foreachRDD{ (rdd, time) =>println(s"rdd = ${rdd.id}; time = $time")
rdd.foreach(value => println(value))
}
val res1: DStream[String] =lines.reduceByWindow(_ + " " + _,Seconds(20), Seconds(10))
res1.print()
val res2: DStream[String] = lines.window(Seconds(20),Seconds(10))
res2.print()
// 求窗口元素的和
val res3:DStream[Int]=lines.map(_.toInt).reduceByWindow(_+_,Seconds(20), Seconds(10))
res3.print()
// 求窗口元素的和
val res4 = res2.map(_.toInt).reduce(_+_)
res4.print()
ssc.start()
ssc.awaitTermination()
}
}
案例二
热点搜索词实时统计。每隔 10 秒,统计最近20秒的词出现的次数,具体代码演示如下:
package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
//设置检查点,检查点具有容错机制。生产环境中应设置到HDFS
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通过reduceByKeyAndWindow算子, 每隔10秒统计最近20秒的词出现的次数
// 后 3个参数:窗口时间长度、滑动窗口时间、分区
val wordCounts1: DStream[(String, Int)] =pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,Seconds(20),Seconds(10), 2)
wordCounts1.print
// 这里需要checkpoint的支持
val wordCounts2: DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_ + _,_ - _,
Seconds(20),Seconds(10), 2)
wordCounts2.print
ssc.start()
ssc.awaitTermination()
}
}
原文地址:https://www.cnblogs.com/bsxc2/p/15040981.html
- 关于表联结方法(二) (r4笔记第23天)
- Spring+SpringMVC+MyBatis+easyUI整合进阶篇(一)设计一套好的RESTful API
- XSS分析及预防
- 关于ORA-01779问题的分析和解决 (r4笔记第22天)
- 想看爱奇艺VIP视频?一个python脚本帮你搞定
- Spring+SpringMVC+MyBatis+easyUI整合进阶篇(十四)Redis缓存正确的使用姿势
- 关于shell中的pl/sql脚本错误排查与分析(r4笔记第21天)
- 关于BFC不会被浮动元素遮盖的一些解释
- MyBatis + MySQL返回插入成功后的主键id
- struts2+spring+hibernate整合步骤(1)
- 微信公众号问题:{"errcode":40125,"errmsg":"invalid appsecret, view more at http://t.cn/LOEdzVq, hints: [
- reflow和repaint(摘录自张鑫旭的翻译)
- git删除本地分支
- org.springframework.data.redis.serializer.SerializationException: Cannot serialize;
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- VSFTPD技术
- Shiro——基于java的安全框架
- 电子圆二色谱(ECD)的理论计算
- 用 80 行 Javascript 代码构建自己的语音助手
- 手把手教你搭建分布式项目环境
- GraphQL两年实战避坑经验
- 基于Dubbo的服务提供者与消费者的发布(在虚拟机中)以及使用nginx对项目进行负载均衡优化
- 我们为什么不使用CSS框架
- java.lang.IllegalArgumentException: node to traverse cannot be null!
- Spring Data Jpa 异常:PropertyReferenceException: No property xxx found for type for type yyy
- Spring全家桶之SpringData——SpringData Redis(附相关jar包)
- Spring全家桶之SpringBoot——初级阶段
- Spring全家桶之SpringBoot——高级阶段
- feign.FeignException$MethodNotAllowed: status 405 reading xxx#yyy(Integer)
- No serializer found for class 类名 and no properties discovered to create BeanSerializer