使用spark对hive表中的多列数据判重
时间:2022-05-02
本文章向大家介绍使用spark对hive表中的多列数据判重,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
本文处理的场景如下,hive表中的数据,对其中的多列进行判重deduplicate。
1、先解决依赖,spark相关的所有包,pom.xml
spark-hive是我们进行hive表spark处理的关键。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.19</version>
</dependency>
</dependencies>
2、spark-client
package com.xiaoju.kangaroo.duplicate;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import java.io.Serializable;
public class SparkClient implements Serializable{
private SparkConf sparkConf;
private JavaSparkContext javaSparkContext;
public SparkClient() {
initSparkConf();
javaSparkContext = new JavaSparkContext(sparkConf);
}
public SQLContext getSQLContext() {
return new SQLContext(javaSparkContext);
}
public HiveContext getHiveContext() {
return new HiveContext(javaSparkContext);
}
private void initSparkConf() {
try {
String warehouseLocation = System.getProperty("user.dir");
sparkConf = new SparkConf()
.setAppName("duplicate")
.set("spark.sql.warehouse.dir", warehouseLocation)
.setMaster("yarn-client");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
3、判重流程
package com.xiaoju.kangaroo.duplicate;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SparkDuplicate implements Serializable {
private transient SparkClient sparkClient;
private transient HiveContext hiveContext;
private String db;
private String tb;
private String pt;
private String cols;
public SparkDuplicate(String db, String tb, String pt, String cols) {
this.db = db;
this.tb = tb;
this.pt = pt;
this.cols = cols;
this.sparkClient = new SparkClient();
this.hiveContext = sparkClient.getHiveContext();
}
public void duplicate() {
String partition = formatPartition(pt);
String query = String.format("select * from %s.%s where %s", db ,tb, partition);
System.out.println(query);
DataFrame rows = hiveContext.sql(query);
JavaRDD<Row> rdd = rows.toJavaRDD();
Map<String, Integer> repeatRetMap = rdd.flatMap(new FlatMapFunction<Row, String>() {
public Iterable<String> call(Row row) throws Exception {
HashMap<String, Object> rowMap = formatRowMap(row);
List<String> sList = new ArrayList<String>();
String[] colList = cols.split(",");
for (String col : colList) {
sList.add(col + "@" + rowMap.get(col));
}
return sList;
}
}).mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).map(new Function<Tuple2<String,Integer>, Map<String, Integer>>() {
public Map<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
Map<String, Integer> retMap = new HashMap<String, Integer>();
if (stringIntegerTuple2._2 > 1) {
retMap.put(stringIntegerTuple2._1, stringIntegerTuple2._2);
}
return retMap;
}
}).reduce(new Function2<Map<String, Integer>, Map<String, Integer>, Map<String, Integer>>() {
public Map<String, Integer> call(Map<String, Integer> stringIntegerMap, Map<String, Integer> stringIntegerMap2) throws Exception {
stringIntegerMap.putAll(stringIntegerMap2);
return stringIntegerMap;
}
});
for (Map.Entry<String, Integer> entry : repeatRetMap.entrySet()) {
if (entry.getValue() > 1) {
System.out.println("重复值为:" + entry.getKey() + ", 重复个数" + entry.getValue());
}
}
}
private String formatPartition(String partition) {
String format = "";
if (partition.startsWith("pt") || partition.startsWith("dt")) {
String[] items = partition.split("=");
for (int i = 0; i < items.length; i++) {
if (items[i].equals("pt") || items[i].equals("dt")) {
format += items[i];
} else {
format += "='" + items[i] + "'";
}
}
} else {
String[] keys;
if (partition.contains("w=")){
keys = new String[] {"year", "week"};
partition = partition.replace("w=", "");
}
else{
keys = new String[] {"year","month","day", "hour"};
}
String[] items = partition.split("/");
for(int i=0; i<items.length; i++) {
if (i == items.length-1) {
format += keys[i] + "='" + items[i] + "'";
} else {
format += keys[i] + "='" + items[i] + "' and ";
}
}
}
return format;
}
private HashMap<String, Object> formatRowMap(Row row){
HashMap<String, Object> rowMap = new HashMap<String, Object>();
try {
for (int i=0; i<row.schema().fields().length; i++) {
String colName = row.schema().fields()[i].name();
Object colValue = row.get(i);
rowMap.put(colName, colValue);
}
}catch (Exception ex) {
ex.printStackTrace();
}
return rowMap;
}
public static void main(String[] args) {
String db = args[0];
String tb = args[1];
String pt = args[2];
String cols = args[3];
SparkDuplicate sparkDuplicate = new SparkDuplicate(db, tb, pt, cols);
sparkDuplicate.duplicate();
}
}
4、运行方式
提交任务脚本
#!/bin/bash
source /etc/profile
source ~/.bash_profile
db=$1
table=$2
partition=$3
cols=$4
spark-submit
--queue=root.zhiliangbu_prod_datamonitor
--driver-memory 500M
--executor-memory 13G
--num-executors 50
spark-duplicate-1.0-SNAPSHOT-jar-with-dependencies.jar ${db} ${table} ${partition} ${cols}
运行:
sh run.sh gulfstream_ods g_order 2017/07/11 area,type
结果
重复值为:area@179, 重复个数225
重复值为:area@80, 重复个数7398
重复值为:area@82, 重复个数69823
重复值为:area@81, 重复个数98317
重复值为:area@84, 重复个数91775
重复值为:area@83, 重复个数72053
重复值为:area@180, 重复个数2362
重复值为:area@86, 重复个数264487
重复值为:area@181, 重复个数2927
重复值为:area@85, 重复个数230484
重复值为:area@88, 重复个数87527
重复值为:area@87, 重复个数74987
重复值为:area@89, 重复个数130297
重复值为:area@188, 重复个数24463
重复值为:area@189, 重复个数15699
重复值为:area@186, 重复个数13517
重复值为:area@187, 重复个数4774
重复值为:area@184, 重复个数5022
重复值为:area@185, 重复个数6737
重复值为:area@182, 重复个数12705
重复值为:area@183, 重复个数18961
重复值为:area@289, 重复个数20715
重复值为:area@168, 重复个数15179
重复值为:area@169, 重复个数1276
重复值为:area@91, 重复个数31664
重复值为:area@90, 重复个数61261
重复值为:area@93, 重复个数32496
重复值为:area@92, 重复个数55877
重复值为:area@95, 重复个数40933
重复值为:area@94, 重复个数32564
重复值为:area@290, 重复个数300
重复值为:area@97, 重复个数21405
重复值为:area@170, 重复个数37696
重复值为:area@291, 重复个数212
重复值为:area@96, 重复个数12442
重复值为:area@99, 重复个数2526
重复值为:area@98, 重复个数17456
重复值为:area@298, 重复个数12688
重复值为:area@177, 重复个数17285
重复值为:area@178, 重复个数11511
重复值为:area@299, 重复个数6622
重复值为:area@175, 重复个数9573
重复值为:area@296, 重复个数2416
重复值为:area@176, 重复个数8109
重复值为:area@297, 重复个数27915
重复值为:area@173, 重复个数58942
重复值为:area@294, 重复个数18842
重复值为:area@295, 重复个数3482
重复值为:area@174, 重复个数31452
重复值为:area@292, 重复个数11436
重复值为:area@171, 重复个数656
重复值为:area@172, 重复个数31557
重复值为:area@293, 重复个数1726
重复值为:type@1, 重复个数288479
重复值为:type@0, 重复个数21067365
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 前端路由实现原理
- 模拟虚拟dom生成实际dom
- Promise教程之产房里生孩子的故事
- 一个现实生活中的例子让你理解Promise的使用场景
- react 跨级组件传参方式 context方式的传参
- Excel文件导入导出操作
- nprogress路由切换添加进度条
- vue-json-excel导出excle表格
- Dom树 CSS树 渲染树(render树) 规则、原理
- Canvas圆形时钟
- 浅谈 Hooks
- Java进阶训练营 第一周JVM 预习笔记
- LeetCode905. 按奇偶排序数组 题解
- codeforces 1133D (map+精度控制)
- 只要十步,你就可以应用表达式树来优化动态调用