在Hive上实现SCD
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzy0623/article/details/51508931
一、问题提出 官方一直称Hive是Hadoop数据仓库解决方案。既然是数据仓库就离不开多维、CDC、SCD这些概念,于是尝试了一把在Hive上实现SCD1和SCD2。这有两个关键点,一个是行级更新,一个是生成代理键。行级更新hive本身就是支持的,但需要一些配置,还有一些限制。具体可参考http://blog.csdn.net/wzy0623/article/details/51483674。生成代理键在RDBMS上一般都用自增序列。Hive也有一些对自增序列的支持,本实验分别使用了窗口函数ROW_NUMBER()和hive自带的UDFRowSequence实现生成代理键。 二、软件版本 Hadoop 2.7.2 Hive 2.0.0 三、实验步骤 1. 准备初始数据文件a.txt,内容如下:
1,张三,US,CA
2,李四,US,CB
3,王五,CA,BB
4,赵六,CA,BC
5,老刘,AA,AA
2. 用ROW_NUMBER()方法实现初始装载和定期装载
(1)建立初始装载脚本init_row_number.sql,内容如下:
USE test;
-- 建立过渡表
DROP TABLE IF EXISTS tbl_stg;
CREATE TABLE tbl_stg (
id INT,
name STRING,
cty STRING,
st STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
-- 建立维度表
DROP TABLE IF EXISTS tbl_dim;
CREATE TABLE tbl_dim (
sk INT,
id INT,
name STRING,
cty STRING,
st STRING,
version INT,
effective_date DATE,
expiry_date DATE)
CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');
-- 向过渡表加载初始数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' INTO TABLE tbl_stg;
-- 向维度表装载初始数据
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY tbl_stg.id) + t2.sk_max,
tbl_stg.*,
1,
CAST('1900-01-01' AS DATE),
CAST('2200-01-01' AS DATE)
from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
(2)执行初始装载
hive -S -f /home/grid/BigDataDWTest/init_row_number.sql
(3)修改数据文件a.txt,内容如下:
1,张,U,C
3,王五,CA,BB
4,赵六,AC,CB
5,刘,AA,AA
6,老杨,DD,DD
说明: 1. 新增了第6条数据 2. 删除了第2条数据 3. 修改了第1条数据的name列、cty列和st列(name列按SCD2处理,cty列和st列按SCD1处理) 4. 修改了第4条数据的cty列和st列(按SCD1处理) 5. 修改了第5条数据的name列(按SCD2处理)
(4)建立定期装载脚本scd_row_number.sql,内容如下:
USE test;
-- 设置日期变量
SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
-- 向过渡表加载更新后的数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' OVERWRITE INTO TABLE tbl_stg;
-- 向维度表装载更新后的数据
-- 设置已删除记录和SCD2的过期
UPDATE tbl_dim
SET expiry_date = ${hivevar:pre_date}
WHERE sk IN
(SELECT a.sk FROM (
SELECT sk,id,name FROM tbl_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN tbl_stg b ON a.id=b.id
WHERE b.id IS NULL OR a.name<>b.name);
-- 处理SCD2新增行
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,
t1.id,
t1.name,
t1.cty,
t1.st,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.id id,
t2.name name,
t2.cty cty,
t2.st st,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
FROM tbl_dim t1 INNER JOIN tbl_stg t2
ON t1.id=t2.id AND t1.name<>t2.name AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN tbl_dim t3 ON T1.id = t3.id AND t3.expiry_date = ${hivevar:max_date}
WHERE t3.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
-- 处理SCD1
-- 因为hive的update还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有cty或st改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.sk,a.id,a.name,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b
WHERE a.id=b.id AND (a.cty <> b.cty OR a.st <> b.st);
DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp);
INSERT INTO tbl_dim SELECT * FROM tmp;
-- 处理新增记录
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,
t1.id,
t1.name,
t1.cty,
t1.st,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id
WHERE t2.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
(5)执行定期装载
hive -S -f /home/grid/BigDataDWTest/scd_row_number.sql
查询维度表结果如图1所示。
select * from tbl_dim order by id,version;
图1
(6)再次执行定期装载,维度表的数据没有变化
hive -S -f /home/grid/BigDataDWTest/scd_row_number.sql
2. 用UDFRowSequence方法实现初始装载和定期装载 实验过程和ROW_NUMBER()方法基本一样,只是先要将hive-contrib-2.0.0.jar传到HDFS上,否则会报错。
hadoop dfs -put /home/grid/hive/lib/hive-contrib-2.0.0.jar /user
初始装载脚本init_UDFRowSequence.sql,内容如下:
USE test;
ADD JAR hdfs:///user/hive-contrib-2.0.0.jar;
CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
-- 建立过渡表
DROP TABLE IF EXISTS tbl_stg;
CREATE TABLE tbl_stg (
id INT,
name STRING,
cty STRING,
st STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
-- 建立维度表
DROP TABLE IF EXISTS tbl_dim;
CREATE TABLE tbl_dim (
sk INT,
id INT,
name STRING,
cty STRING,
st STRING,
version INT,
effective_date DATE,
expiry_date DATE)
CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');
-- 向过渡表加载初始数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' INTO TABLE tbl_stg;
-- 向维度表装载初始数据
INSERT INTO tbl_dim
SELECT
t2.sk_max + row_sequence(),
tbl_stg.*,
1,
CAST('1900-01-01' AS DATE),
CAST('2200-01-01' AS DATE)
from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
定期装载脚本scd_UDFRowSequence.sql,内容如下:
USE test;
ADD JAR hdfs:///user/hive-contrib-2.0.0.jar;
CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
-- 设置日期变量
SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
-- 向过渡表加载更新后的数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' OVERWRITE INTO TABLE tbl_stg;
-- 向维度表装载更新后的数据
-- 设置已删除记录和SCD2的过期
UPDATE tbl_dim
SET expiry_date = ${hivevar:pre_date}
WHERE sk IN
(SELECT a.sk FROM (
SELECT sk,id,name FROM tbl_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN tbl_stg b ON a.id=b.id
WHERE b.id IS NULL OR a.name<>b.name);
-- 处理SCD2新增行
INSERT INTO tbl_dim
SELECT
t2.sk_max + row_sequence(),
t1.id,
t1.name,
t1.cty,
t1.st,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.id id,
t2.name name,
t2.cty cty,
t2.st st,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
FROM tbl_dim t1 INNER JOIN tbl_stg t2
ON t1.id=t2.id AND t1.name<>t2.name AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN tbl_dim t3 ON T1.id = t3.id AND t3.expiry_date = ${hivevar:max_date}
WHERE t3.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
-- 处理SCD1
-- 因为hive的update还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有cty或st改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.sk,a.id,a.name,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b
WHERE a.id=b.id AND (a.cty <> b.cty OR a.st <> b.st);
DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp);
INSERT INTO tbl_dim SELECT * FROM tmp;
-- 处理新增记录
INSERT INTO tbl_dim
SELECT
t2.sk_max + row_sequence(),
t1.id,
t1.name,
t1.cty,
t1.st,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id
WHERE t2.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
参考: http://blog.csdn.net/wzy0623/article/details/49616643 http://www.remay.com.br/blog/hdp-2-2-how-to-create-a-surrogate-key-on-hive/ http://www.aboutechnologies.com/2016/02/hive-auto-increment-column.html
- 关于dg broker的简单配置(r5笔记第99天)
- 三天速成 TensorFlow课件分享
- 干货 | 机器学习算法线上部署方法
- 用于快速开发 3D 数据处理软件的开源数据处理库 —— Open3D | Github 项目推荐
- 【java网络】IO编程
- 一周 Github Trending 热门项目,最全中华古诗词数据库 | Github 项目推荐
- 【线程池】线程池与工作队列
- 一个快速方便的图形化 Python 调试器 —— birdseye | Github 项目推荐
- 关于分区表的在线重定义(r5笔记第98天)
- 10个实用的但偏执的Java编程技术
- 看似诡异的tablespace online问题(r5笔记第95天)
- python2.7进行爬虫POI代码(划分小网格算法)
- 从 Encoder 到 Decoder 实现 Seq2Seq 模型
- python2.7进行爬虫百度POI代码(划分小网格算法)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Docsify 如何添加目录列表
- Dubbo日志链路追踪TraceId选型
- 重温C++的设计思想
- 设计一个网站(域名)的镜像
- LoRa终端设备ASR6505普通GPIO操作
- LoRa终端设备ASR6505之I2C通信
- 我在暴躁同事小张的胁迫下学会了Go的交叉编译和条件编译
- LoRa终端设备ASR6505之PingPong通信
- LoRa点对点通信,OLED显示(内附代码)
- 08 . Python3高阶函数之迭代器、装饰器
- redis源码之SDS
- 01 . Tomcat简介及多实例部署
- 学练结合,快速掌握Kubernetes Service
- arraylist linkedlist vector
- Java的HashMap和HashTable