10. awd 层(数据集市层)

时间:2022-08-10
本文章向大家介绍10. awd 层(数据集市层),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

10. awd 层(数据集市层)

10.1 指标体系相关概念

10.2 ads层的结构如下:

10.3 在hive中的建表语句如下:

CREATE TABLE IF NOT EXISTS ads.ads_rk_ccxx_xz_index_d(
    XZJD  string comment'乡镇(街道)',
    sfyfc_num BIGINT comment '多少人有房',
    sfyfc_num_p DOUBLE comment '有房人口的比例',
    sfygc_num BIGINT comment '多少人有公司',
    sfygc_num_p DOUBLE comment '有公司人口的比例',
    sfygjj_num BIGINT comment '多少人有公积金',
    sfygjj_num_p DOUBLE comment '有公积金人口的比例',
    sfysb_num BIGINT comment '多少人有社保',
    sfysb_num_p DOUBLE comment '有社保人口的比例'
)
 PARTITIONED BY (
 ds string comment '分区'
                )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
 STORED AS TEXTFILE
 location'/daas/motl/ads/ads_rk_ccxx_xz_index_d';

10.4 根据需求将dws层和dim层关联起来,通过计算,得到结果。

insert overwrite table ads.ads_rk_ccxx_xz_index_d partition(ds=${ds})

select
    b.XZJD,
    sum(sfyfc) as sfyfc_num,
    round(sum(sfyfc)/count(1),4) as sfyfc_num_p,
    sum(case when sfygc=1 or sywgd =1 then 1 else 0 end) as sfygc_num,
    round(sum(case when sfygc=1 or sywgd =1 then 1 else 0 end) / count(1),4) as sfygc_num_p,
    sum(sfygjj) as sfygjj_num,
    round(sum(sfygjj)/count(1),4) as sfygjj_num_p,
    sum(sfysb) as sfysb_num,
    round(sum(sfysb)/count(1),4) as sfysb_num_p
from
    (
        select * from
            dws.dws_population_property_info_msk_d
        where ds=${ds}
    ) as a
        inner join
    (
        select id,XZJD from
            dim.dim_user_info_d where ds=${ds}
    ) as b
    on a.id=b.id
group by b.XZJD

10.5 执行脚本如下:

# 分区
ds=$1

# 让环境变量生效
source /etc/profile
# 获取脚本所有在的位置
shell_home="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# 切换目录
cd $shell_home
# 执行脚本

# 执行任务
# num-executor 在项目现场一般50-100个
spark-sql \
--master yarn-client \
--num-executors=1 \
--executor-cores=2 \
--executor-memory=4G \
--conf spark.sql.shuffle.partitions=2 \
-f ../dql/ads_rk_ccxx_xz_index_d.sql \
-d ds=$ds

/home/ads/dql/ads_rk_ccxx_xz_index_d.sql

先自己测试一下

10.6 我们将hive中的表导入到mysql中

问题:为什么不直接从hive中查询呢?

:在hive上通过各种计算得到的数据体量不是很大,如果用Hive会存在高延迟的情况,

相反,将数据放到mysql中,可以进行实时查询,速度更快

而且在mysql中可以建立索引,加速数据的查询

注意点: 在hive中并没有时间分区字段,只是分区表,所以我们在进行数据同步到mysql中,需要加上

10.7 现在mysql中将表建立起来

DROP TABLE IF EXISTS `ads_rk_ccxx_xz_index_d`;
CREATE TABLE `ads_rk_ccxx_xz_index_d` (
  `xzjd` varchar(255) NOT NULL,
  `sfyfc_num` bigint(20) DEFAULT NULL,
  `sfyfc_num_p` double(20,4) DEFAULT NULL,
  `sfygc_num` bigint(20) DEFAULT NULL,
  `sfygc_num_p` double(20,4) DEFAULT NULL,
  `sfygjj_num` bigint(20) DEFAULT NULL,
  `sfygjj_num_p` double(20,4) DEFAULT NULL,
  `sfysb_num` bigint(20) DEFAULT NULL,
  `sfysb_num_p` double(20,4) DEFAULT NULL,
  `ds` varchar(255) NOT NULL,
  PRIMARY KEY (`xzjd`,`ds`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

10.8 同步脚本如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/daas/motl/ads/ads_rk_ccxx_xz_index_d/ds=${ds}",
            "defaultFS": "hdfs://master:9000",
            "column": [
              {
                "index": 0,
                "type": "string"
              },
              {
                "index": 1,
                "type": "long"
              },
              {
                "index": 2,
                "type": "DOUBLE"
              },
              {
                "index": 3,
                "type": "long"
              },
              {
                "index": 4,
                "type": "DOUBLE"
              },

              {
                "index": 5,
                "type": "long"
              },
              {
                "index": 6,
                "type": "DOUBLE"
              },
              {
                "index": 7,
                "type": "long"
              },
              {
                "index": 8,
                "type": "DOUBLE"
              },
              {
                "type": "string",
                "value": "${ds}"
              }
            ],
            "fileType": "text",
            "encoding": "UTF-8",
            "fieldDelimiter": "\t"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "replace",
            "username": "root",
            "password": "123456",
            "column": [
              "XZJD",
              "sfyfc_num",
              "sfyfc_num_p",
              "sfygc_num",
              "sfygc_num_p",
              "sfygjj_num",
              "sfygjj_num_p",
              "sfysb_num",
              "sfysb_num_p",
              "ds"
            ],

            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=utf-8",
                "table": [
                  "ads_rk_ccxx_xz_index_d"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

10.9 当然了,执行脚本就比较简单了

# 分区
ds=$1

# 让环境变量生效
source /etc/profile
# 获取脚本所有在的位置
shell_home="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# 切换目录
cd $shell_home
# 执行脚本
datax.py -p "-Dds=${ds}" ../datax/ods_t_fcj_nwrs_sellbargain.json

10 .10 结果如下:

原文地址:https://www.cnblogs.com/atao-BigData/p/16573691.html