用MapReduce分析Hbase将结果插入mysql中
时间:2022-05-07
本文章向大家介绍用MapReduce分析Hbase将结果插入mysql中,主要内容包括run方法的第一条命令processArgs处理参数、run方法的第二条命令、run方法的第三条命令TableMapReduceUtil.initTableMapperJob设置job执行参数、run方法的下一条命令job.setReducerClass、run方法的下一条命令job.setOutputFormatClass、run方法的下一个分支执行MR成功后,执行计算总用户、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
从HBASE读取清洗过的数据,写入到mysql的表中
NewInstallUserRunner.java 计算新增用户入口类
NewInstallUserRunner的所有属性方法
main方法:
public static void main(String[] args)
只有一个方法ToolRunner.run
org.apache.hadoop.util.ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args);
入口类implements Tool接口
public class NewInstallUserRunner implements Tool
Tool定义run方法
int run(String [] args) throws Exception;
Tool 继承Configurable
public interface Tool extends Configurable
Configurable定义两个方法
/** Set the configuration to be used by this object. */
void setConf(Configuration conf);
/** Return the configuration used by this object. */
Configuration getConf();
所以入口类需要实现3个方法
int run(String [] args) throws Exception;
void setConf(Configuration conf);
Configuration getConf();
setConf方法实现
@Override
public void setConf(Configuration conf) {
conf.addResource("output-collector.xml");
conf.addResource("query-mapping.xml");
conf.addResource("transformer-env.xml");
conf.set("fs.defaultFS", "hdfs://master:8020");
conf.set("yarn.resourcemanager.hostname", "master");
conf.set("hbase.zookeeper.quorum", "master");
this.conf = HBaseConfiguration.create(conf);
}
定义output-collector的类,反射用的
output-collector.xml
<property>
<name>collector_new_install_user</name>
<value>com.sxt.transformer.mr.nu.StatsUserNewInstallUserCollector</value>
</property>
输出到mysql的时候,组成insert语句的
query-mapping.xml
<property>
<name>new_install_user</name>
<value>
INSERT INTO `stats_user`(
`platform_dimension_id`,
`date_dimension_id`,
`new_install_users`,
`created`)
VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE `new_install_users` = ?
</value>
</property>
输出到mysql的链接信息
transformer-env.xml
getConf方法实现
@Override
public Configuration getConf() {
return this.conf;
}
run方法的实现
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 处理参数
this.processArgs(conf, args);
// Job job = Job.getInstance(conf, "new_install_user");
System.out.println("KpiType.NEW_INSTALL_USER.toString()===="
+ KpiType.NEW_INSTALL_USER.name);
Job job = Job.getInstance(conf, KpiType.NEW_INSTALL_USER.name);
// job.setJarByClass(NewInstallUserRunner.class);
// 本地运行
TableMapReduceUtil.initTableMapperJob(initScans(job),
NewInstallUserMapper.class, StatsUserDimension.class,
TimeOutputValue.class, job, false);
// 集群运行:本地提交和打包(jar)提交
// TableMapReduceUtil.initTableMapperJob(initScans(job),
// NewInstallUserMapper.class, StatsUserDimension.class,
// TimeOutputValue.class, job);
job.setReducerClass(NewInstallUserReducer.class);
job.setOutputKeyClass(StatsUserDimension.class);
job.setOutputValueClass(MapWritableValue.class);
// job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TransformerOutputFormat.class);
if (job.waitForCompletion(true)) {
// 执行成功, 需要计算总用户
this.calculateTotalUsers(conf);
return 0;
} else {
return -1;
}
}
run方法的第一条命令processArgs处理参数
processArgs调用处理参数的方法
this.processArgs(conf, args);
run方法第一条 this.processArgs(conf, args); 执行结束, 返回结果默认是昨天,或者运行时加入-d yyyy-DD-mm 格式输入的日期。
run方法的第二条命令
run方法的第二条语句
Job job = Job.getInstance(conf, "new_install_user");
是不是应该这么写更好?
Job job = Job.getInstance(conf, KpiType.NEW_INSTALL_USER.name);
参数中用到的枚举方法
public enum KpiType {
NEW_INSTALL_USER("new_install_user"), // 统计新用户的kpi
BROWSER_NEW_INSTALL_USER("browser_new_install_user"), // 统计浏览器维度的新用户kpi
ACTIVE_USER("active_user"), // 统计活跃用户kpi
BROWSER_ACTIVE_USER("browser_active_user"), // 统计浏览器维度的活跃用户kpi
;
run方法的第三条命令TableMapReduceUtil.initTableMapperJob设置job执行参数
TableMapReduceUtil.initTableMapperJob(initScans(job),
NewInstallUserMapper.class,
StatsUserDimension.class,
TimeOutputValue.class, job,
false);
addDependencyJars 如果在服务器运行,需要设置为true 如果在本地运行 设置成false
/**
* Use this before submitting a Multi TableMap job.
* It will appropriately setup the job.
*
* @param scans The list of {@link Scan} objects to read from.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is carrying
* all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the
* configured job classes via the distributed cache (tmpjars).
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(List<Scan> scans,
Class<? extends TableMapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, Job job,
boolean addDependencyJars) throws IOException {
initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
addDependencyJars, true);
}
第一个参数initScans(job)
http://www.jianshu.com/p/fd25d036d4dc
第二个参数NewInstallUserMapper.class
NewInstallUserMapper.class
http://www.jianshu.com/p/1493de43dbf7
第三、四个参数是输出的K,V
run方法的下一条命令job.setReducerClass
job.setReducerClass(NewInstallUserReducer.class); NewInstallUserReducer.class
run方法的下一条命令job.setOutputFormatClass
job.setOutputFormatClass(TransformerOutputFormat.class); TransformerOutputFormat.class
run方法的下一个分支执行MR成功后,执行计算总用户
if (job.waitForCompletion(true)) {
// 执行成功, 需要计算总用户
this.calculateTotalUsers(conf);
return 0;
} else {
return -1;
}
private void calculateTotalUsers(Configuration conf)
- 云本机应用程序成熟度的模型
- 如何利用ETW(Event Tracing for Windows)记录日志
- 如何利用ETW(Event Tracing for Windows)记录日志
- 如何利用ETW(Event Tracing for Windows)记录日志
- ASP.NET MVC以ValueProvider为核心的值提供系统: DictionaryValueProvider
- ASP.NET MVC如何实现自定义验证(服务端验证+客户端验证)
- .NET Core的文件系统[2]:FileProvider是个什么东西?
- Python多线程怎样优雅的响应中断异常
- .NET Core的文件系统[3]:由PhysicalFileProvider构建的物理文件系统
- .NET Core的文件系统[4]:由EmbeddedFileProvider构建的内嵌(资源)文件系统
- 学习July博文总结——支持向量机(SVM)的深入理解(下)
- 在gridview和datagrid里设置列宽
- ASP.NET MVC的Model元数据与Model模板:将”ListControl”引入ASP.NET MVC
- .NET Core的文件系统[5]:扩展文件系统构建一个简易版“云盘”
- MySQL 教程
- MySQL 安装
- MySQL 管理与配置
- MySQL PHP 语法
- MySQL 连接
- MySQL 创建数据库
- MySQL 删除数据库
- MySQL 选择数据库
- MySQL 数据类型
- MySQL 创建数据表
- MySQL 删除数据表
- MySQL 插入数据
- MySQL 查询数据
- MySQL where 子句
- MySQL UPDATE 查询
- MySQL DELETE 语句
- MySQL LIKE 子句
- mysql order by
- Mysql Join的使用
- MySQL NULL 值处理
- MySQL 正则表达式
- MySQL 事务
- MySQL ALTER命令
- MySQL 索引
- MySQL 临时表
- MySQL 复制表
- 查看MySQL 元数据
- MySQL 序列 AUTO_INCREMENT
- MySQL 处理重复数据
- MySQL 及 SQL 注入
- MySQL 导出数据
- MySQL 导入数据
- MYSQL 函数大全
- MySQL Group By 实例讲解
- MySQL Max()函数实例讲解
- mysql count函数实例
- MYSQL UNION和UNION ALL实例
- MySQL IN 用法
- MySQL between and 实例讲解
- ZooKeeper的十二连问,你顶得了嘛?
- 手把手教你,嘴对嘴传达----Apache虚拟主机配置与应用
- 手把手教你,嘴对嘴传达----Apache的访问控制
- spring框架应用系列二:component-scan自动扫描注册装配
- 手把手教你,嘴对嘴传达------Apache日志管理日志(rotatelogs分割工具、AWStats日志分析)
- 配合JAVA的AJAX使用
- 手把手教你,嘴对嘴传达------Apache网页优化
- jQuery通过Ajax实现请求后台接口数据
- Git常规操作
- 手把手教你,嘴对嘴传达 ----源码编译安装部署LAMP平台(LAMP平台与编译安装详解,Apache,MySQL与PHP源码编译安装,LAMP平台搭建论坛)
- Vue点击切换样式
- ElementUI引入到vue项目开发
- 手把手教你,嘴对嘴传达------Apache(安全优化防盗链、隐藏版本信息)
- spring框架应用系列三:切面编程(带参数)
- 排障集锦:九九八十一难之第六难!(98)Address already in use: AH00072: make_sock: could not bind to address ::80