Spark Streaming自定义Receivers
时间:2022-04-29
本文章向大家介绍Spark Streaming自定义Receivers,主要内容包括自定义一个Receiver、An Actor as Receiver、A Sample Spark Application、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
自定义一个Receiver
class SocketTextStreamReceiver(host: String, port: Int(
extends NetworkReceiver[String]
{
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
An Actor as Receiver
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
}
A Sample Spark Application
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
//使用自定义的receiver
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
//或者使用这个自定义的actor Receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver") */
提交成功之后,启动Netcat测试一下
$ nc -l localhost 8445 hello world hello hello
下面是合并多个输入流的方法:
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
- cf(#div1 B. Dreamoon and Sets)(数论)
- hdu 1805Expressions(二叉树构造的后缀表达式)
- 清空messages没有权限的解决方法
- hdu1710(Binary Tree Traversals)(二叉树遍历)
- 基本线程同步(一) 同步方法
- uva514(trail)(模拟栈)
- zoj3822 Domination(概率dp)
- Veeam Backup & Replication(三):创建备份与还原备份
- 使用Go开发一个简单的服务器程序
- C++ 与设计模式学习(其一)
- xz文件压缩工具的用法
- Java 中正确使用 hashCode 和 equals 方法
- C/C++ 关于生成静态库(lib)/动态库(dll)文件如何使用(基于windows基础篇)
- openwrt将LAN口改为WAN方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- C语言直接实现开机密码修改!
- C语言实现类似QQ聊天界面抖动功能!
- Java基础数据类型和引用类型的区别
- 解决Android8.0之后开启service
- 深入理解Java泛型(一.泛型的作用与定义)
- OpenCV与图像处理(九)
- 如何编写高质量的代码
- [数据结构与算法] 哈希表
- [数据结构与算法] 排序算法之直接插入排序与希尔排序
- [数据结构与算法] 排序算法之选择排序和堆排序
- [数据结构与算法] 树结构之二叉树
- [数据结构与算法] 排序算法之归并排序与基数排序
- [数据结构与算法] 邂逅数组与队列
- [数据结构与算法] 邂逅链表
- 【LeetCode每日一题】23. Merge k Sorted Lists