spark读取Hive
时间:2022-07-22
本文章向大家介绍spark读取Hive,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
导入依赖
导入关键的依赖包
compile("org.scala-lang:scala-library:$scalaVersion")
compile("org.scala-lang:scala-reflect:$scalaVersion")
compile("org.scala-lang:scala-compiler:$scalaVersion")
compile("org.apache.spark:spark-sql_2.11:$sparkVersion")
compile("org.apache.spark:spark-streaming_2.11:$sparkVersion")
compile("org.apache.spark:spark-hive_2.11:$sparkVersion")
compile("org.apache.spark:spark-hive-thriftserver_2.11:$sparkVersion")
启动hive支持
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
//配置spark
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.master("local[2]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("hive.metastore.uris", "thrift://hdp2.nsrc.com:9083")
.config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
.config("hive.input.dir.recursive", "true")
.config("hive.mapred.supports.subdirectories", "true")
.config("hive.supports.subdirectories", "true")
.config("spark.driver.maxResultSize", "5g")
//启动hive支持
.enableHiveSupport()
.getOrCreate()
var startDay = "2019-03-08 00:00:00"
var endDay = "2019-03-10 23:59:59"
var srcIp = "10.28.137.84"
//直接使用sparksql进行查询,返回为df
var resultDf = spark.sql("select * from http_origin where date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')>= '"+startDay+"'" +
"and date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')<= '"+endDay+"' and http_origin.srcip = '"+srcIp+"'")
hive与hbase关联,可以作为一种查询hbase的方式
创建hive对应的映射语句
CREATE EXTERNAL TABLE IF NOT EXISTS httpsystem_dev( id String, srcIp String, srcPort Int, distIp String, distPort Int, requestURL String, requestMethod String, requestUserAgent String, requestCookie String, responseServer String, responseCode Int, requestHeader String, requestContType String, responseCharset String, httpVersion String, requestHost String, requestBodyString String, requestParameterString String, responseContentType String, responseHeader String, responseBodyReference String, ML_rule_juge String, ML_rule_juge_id String, ML_type String, ML_juge_mal String, ML_juge_type String, DLCNN_rule_juge String, DLCNN_type String, DLCNN_juge_mal String, DLCNN_juge_type String) STORED BY'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES( 'serialization.format'='t','hbase.columns.mapping'=':key,0:srcIp, 0:srcPort, 0:distIp, 0:distPort, 0:requestURL, 0:requestMethod, 0:requestUserAgent, 0:requestCookie, 0:responseServer, 0:responseCode, 0:requestHeader, 0:requestContType, 0:responseCharset, 0:httpVersion, 0:requestHost, 0:requestBodyString, 0:requestParameterString, 0:responseContentType, 0:responseHeader, 0:responseBodyReference, 0:ML_rule_juge, 0:ML_rule_juge_id, 0:ML_type, 0:ML_juge_mal, 0:ML_juge_type, 0:DLCNN_rule_juge, 0:DLCNN_type, 0:DLCNN_juge_mal, 0:DLCNN_juge_type','field.delim'='t') TBLPROPERTIES ('hbase.table.name'='httpsystem_dev')
将结果保存csv到HDFS
var url: String = "hdfs://hdp1.nsrc.com:8020/user/http_system/offline_file/" + "123"
resultDf.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).option("header", "false")
.save(url);
// //创建样例列表
创建视图返回局部结果
resultDf.registerTempTable("offlineResult")
// var samples = spark.sql("select srcip,distip,requesthost,requesturl,requsetheader," +
// "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
var samples = spark.sql("select srcip,distip,requesthost,requesturl,requestheader," +
"requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
samples.show()
var rows = samples.collect()
for(row <- rows){
println(row(1),row(0),row(7))
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法