Spark和Spring整合处理离线数据
时间:2022-07-23
本文章向大家介绍Spark和Spring整合处理离线数据,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
如果你比较熟悉JavaWeb应用开发,那么对Spring框架一定不陌生,并且JavaWeb通常是基于SSM搭起的架构,主要用Java语言开发。但是开发Spark程序,Scala语言往往必不可少。
众所周知,Scala如同Java一样,都是运行在JVM上的,所以它具有很多Java语言的特性,同时作为函数式编程语言,又具有自己独特的特性,实际应用中除了要结合业务场景,还要对Scala语言的特性有深入了解。
如果想像使用Java语言一样,使用Scala来利用Spring框架特性、并结合Spark来处理离线数据,应该怎么做呢?
本篇文章,通过详细的示例代码,介绍上述场景的具体实现,大家如果有类似需求,可以根据实际情况做调整。
1.定义一个程序启动入口
object Bootstrap {
private val log = LoggerFactory.getLogger(Bootstrap.getClass)
//指定配置文件如log4j的路径
val ConfFileName = "conf"
val ConfigurePath = new File("").getAbsolutePath.substring(0, if (new File("").getAbsolutePath.lastIndexOf("lib") == -1) 0
else new File("").getAbsolutePath.lastIndexOf("lib")) + this.ConfFileName + File.separator
//存放实现了StatsTask的离线程序处理的类
private val TASK_MAP = Map("WordCount" -> classOf[WordCount])
def main(args: Array[String]): Unit = {
//传入一些参数,比如要运行的离线处理程序类名、处理哪些时间的数据
if (args.length < 1) {
log.warn("args 参数异常!!!" + args.toBuffer)
System.exit(1)
}
init(args)
}
def init(args: Array[String]) {
try {
SpringUtils.init(Array[String]("applicationContext.xml"))
initLog4j()
val className = args(0)
// 实例化离线处理类
val task = SpringUtils.getBean(TASK_MAP(className))
args.length match {
case 3 =>
// 处理一段时间的每天离线数据
val dtStart = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1))
val dtEnd = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(2))
val days = Days.daysBetween(dtStart, dtEnd).getDays + 1
for (i <- 0 until days) {
val etime = dtStart.plusDays(i).toString("yyyy-MM-dd")
task.runTask(etime)
log.info(s"JOB --> $className 已成功处理: $etime 的数据")
}
case 2 =>
// 处理指定的某天离线数据
val etime = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1)).toString("yyyy-MM-dd")
task.runTask(etime)
log.info(s"JOB --> $className 已成功处理: $etime 的数据")
case 1 =>
// 处理前一天离线数据
val etime = DateTime.now().minusDays(1).toString("yyyy-MM-dd")
task.runTask(etime)
log.info(s"JOB --> $className 已成功处理: $etime 的数据")
case _ => println("执行失败 args参数:" + args.toBuffer)
}
} catch {
case e: Exception =>
println("执行失败 args参数:" + args.toBuffer)
e.printStackTrace()
}
// 初始化log4j
def initLog4j() {
val fileName = ConfigurePath + "log4j.properties"
if (new File(fileName).exists) {
PropertyConfigurator.configure(fileName)
log.info("日志log4j已经启动")
}
}
}
}
2.加载Spring配置文件工具类
object SpringUtils {
private var context: ClassPathXmlApplicationContext = _
def getBean(name: String): Any = context.getBean(name)
def getBean[T](name: String, classObj: Class[T]): T = context.getBean(name, classObj)
def getBean[T](_class: Class[T]): T = context.getBean(_class)
def init(springXml: Array[String]): Unit = {
if (springXml == null || springXml.isEmpty) {
try
throw new Exception("springXml 不可为空")
catch {
case e: Exception => e.printStackTrace()
}
}
context = new ClassPathXmlApplicationContext(springXml(0))
context.start()
}
}
3.Spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<!-- 配置包扫描 -->
<context:component-scan base-package="com.bigdata.stats"/>
</beans>
4.定义一个trait,作为离线程序的公共"父类"
trait StatsTask extends Serializable {
//"子类"继承StatsTask重写该方法实现自己的业务处理逻辑
def runTask(etime: String)
}
5.继承StatsTask的离线处理类
//不要忘记添加 @Component ,否则无法利用Spring对WordCount进行实例化
@Component
class WordCount extends StatsTask {
override def runTask(etime: String): Unit = {
val sparkSession = SparkSession
.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
import sparkSession.implicits._
val words = sparkSession.read.textFile("/Users/BigData/Documents/data/wordcount.txt").flatMap(_.split(" "))
.toDF("word")
words.createOrReplaceTempView("wordcount")
val df = sparkSession.sql("select word, count(*) count from wordcount group by word")
df.show()
}
}
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- Android Canvas的drawText()与文字居中方案详解
- JeecgCloud版,部署项目。
- docker(镜像常用命令)
- [- Flutter基础篇 -] 聊聊那些弹框
- 聊一聊Android中的StateListAnimator
- Linux KeyLogger
- [- Flutter 数据&状态篇 -] InheritedWidget
- Android实现图片一边的三角形边框效果
- Android使用SoundPool播放短音效
- 用Jquery做一个进度条
- Android SoundPool实现简短小音效
- [-Flutter趣玩篇-] 出神入化的Align
- Android应用禁止屏幕休眠的3种方法
- Flutter 实现下拉刷新上拉加载的示例代码
- [- C++趣玩篇1 -] 从打印开始说起