spark读取Hive

时间:2022-07-22
本文章向大家介绍spark读取Hive,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

导入依赖

导入关键的依赖包

    compile("org.scala-lang:scala-library:$scalaVersion")
    compile("org.scala-lang:scala-reflect:$scalaVersion")
    compile("org.scala-lang:scala-compiler:$scalaVersion")

    compile("org.apache.spark:spark-sql_2.11:$sparkVersion")
    compile("org.apache.spark:spark-streaming_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive-thriftserver_2.11:$sparkVersion")

启动hive支持

val warehouseLocation = new File("spark-warehouse").getAbsolutePath
   //配置spark
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("hive.metastore.uris", "thrift://hdp2.nsrc.com:9083")
      .config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
      .config("hive.input.dir.recursive", "true")
      .config("hive.mapred.supports.subdirectories", "true")
      .config("hive.supports.subdirectories", "true")
      .config("spark.driver.maxResultSize", "5g")
      //启动hive支持
      .enableHiveSupport()
      .getOrCreate()
    var startDay = "2019-03-08 00:00:00"
    var endDay = "2019-03-10 23:59:59"
    var srcIp = "10.28.137.84"
    //直接使用sparksql进行查询,返回为df
    var resultDf = spark.sql("select * from http_origin where  date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')>= '"+startDay+"'" +
      "and date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')<= '"+endDay+"' and http_origin.srcip = '"+srcIp+"'")

hive与hbase关联,可以作为一种查询hbase的方式

创建hive对应的映射语句

CREATE EXTERNAL TABLE IF NOT EXISTS httpsystem_dev( id String, srcIp String, srcPort Int, distIp String, distPort Int, requestURL String, requestMethod String, requestUserAgent String, requestCookie String, responseServer String, responseCode Int, requestHeader String, requestContType String, responseCharset String, httpVersion String, requestHost String, requestBodyString String, requestParameterString String, responseContentType String, responseHeader String, responseBodyReference String, ML_rule_juge String, ML_rule_juge_id String, ML_type String, ML_juge_mal String, ML_juge_type String, DLCNN_rule_juge String, DLCNN_type String, DLCNN_juge_mal String, DLCNN_juge_type String) STORED BY'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES( 'serialization.format'='t','hbase.columns.mapping'=':key,0:srcIp, 0:srcPort, 0:distIp, 0:distPort, 0:requestURL, 0:requestMethod, 0:requestUserAgent, 0:requestCookie, 0:responseServer, 0:responseCode, 0:requestHeader, 0:requestContType, 0:responseCharset, 0:httpVersion, 0:requestHost, 0:requestBodyString, 0:requestParameterString, 0:responseContentType, 0:responseHeader, 0:responseBodyReference, 0:ML_rule_juge, 0:ML_rule_juge_id, 0:ML_type, 0:ML_juge_mal, 0:ML_juge_type, 0:DLCNN_rule_juge, 0:DLCNN_type, 0:DLCNN_juge_mal, 0:DLCNN_juge_type','field.delim'='t') TBLPROPERTIES ('hbase.table.name'='httpsystem_dev')

将结果保存csv到HDFS

var url: String = "hdfs://hdp1.nsrc.com:8020/user/http_system/offline_file/" + "123"
        resultDf.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).option("header", "false")
         .save(url);
//        //创建样例列表

创建视图返回局部结果

       resultDf.registerTempTable("offlineResult")
//      var samples = spark.sql("select srcip,distip,requesthost,requesturl,requsetheader," +
//        "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
        var samples = spark.sql("select srcip,distip,requesthost,requesturl,requestheader," +
  "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
      samples.show()
           var rows = samples.collect()
           for(row <- rows){
              println(row(1),row(0),row(7))
           }