Hadoop通过HCatalog编写Mapreduce任务访问hive库中schema数据
时间:2022-05-02
本文章向大家介绍Hadoop通过HCatalog编写Mapreduce任务访问hive库中schema数据,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1、dirver
package com.kangaroo.hadoop.drive;
import java.util.Map;
import java.util.Properties;
import com.kangaroo.hadoop.mapper.AggregateMapper;
import com.kangaroo.hadoop.reducer.AggregateReducer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kangaroo.hadoop.utils.PropertiesUtil;
public class DriveMain extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(DriveMain.class);
private Configuration conf;
private PropertiesUtil propUtil;
public DriveMain() {
this.conf = new Configuration();
this.propUtil = new PropertiesUtil("configure.properties");
}
public int run(String[] args) throws Exception {
try {
logger.info("MapReduce Job Beginning.");
String dbName = args[0];
String tableName = args[1];
String partition = args[2];
String sumField = args[3];
String outPath = args[4];
String partFilter = partitionFormat(partition);
logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}",
dbName, tableName, partition, sumField, outPath, partFilter);
this.conf.set("sumField", sumField);
this.setMapRedConfiguration();
Job job = this.setJobConfiguration(this.conf);
HCatInputFormat.setInput(job, dbName, tableName, partFilter);
logger.info("setInput successfully.");
FileOutputFormat.setOutputPath(job, new Path(outPath));
logger.info("setOutput successfully.");
return (job.waitForCompletion(true) ? 0 : 1);
} catch (Exception ex) {
logger.error(ex.getMessage());
throw ex;
}
}
private Job setJobConfiguration(Configuration conf) throws Exception {
try {
logger.info("enter setJobConfiguration");
Job job = Job.getInstance(conf);
job.setJarByClass(DriveMain.class);
job.setInputFormatClass(HCatInputFormat.class);
job.setMapperClass(AggregateMapper.class);
job.setReducerClass(AggregateReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
logger.info("setJobConfiguration successfully.");
return job;
} catch (Exception ex) {
logger.error("setJobConfiguration: " + ex.getMessage());
throw new Exception(ex);
}
}
private void setMapRedConfiguration() {
try {
Properties properties = propUtil.getProperties();
logger.info("Load MapReduce Configuration Successfully.");
for (Map.Entry entry : properties.entrySet()) {
if (entry.getKey().toString().startsWith("mapred")) {
conf.set(entry.getKey().toString(), entry.getValue().toString());
logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString());
}
}
logger.info("[MR][Config] Set MapReduce Configuration Successfully.");
} catch (Exception e) {
}
}
private String partitionFormat(String partition) {
String format = "";
if(!partition.contains("pt") && ! partition.contains("dt")) {
String[] items = partition.split("/");
String[] keys = {"year","month","day", "hour"};
for(int i=0; i<items.length; i++) {
if (i == items.length-1) {
format += keys[i] + "='" + items[i] + "'";
} else {
format += keys[i] + "='" + items[i] + "' and ";
}
}
} else {
format = partition;
}
return format;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DriveMain(), args);
System.exit(exitCode);
}
}
2、Mapper
package com.kangaroo.hadoop.mapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("rawtypes")
public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> {
private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class);
private HCatSchema schema;
private Text outKey;
private Text outValue;
private IntWritable one;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
outKey = new Text();
outValue = new Text();
schema = HCatInputFormat.getTableSchema(context.getConfiguration());
}
@Override
protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
String sumField = context.getConfiguration().get("sumField");
Map<String, String> recordMap = new HashMap<String, String>();
for (String fieldName : schema.getFieldNames()) {
logger.info("fieldName={}", fieldName);
String fieldValue = value.get(fieldName, schema).toString();
logger.info("fieldName={}, fieldValue={}", fieldName, fieldValue);
recordMap.put(fieldName, fieldValue);
logger.info("recordMap={}", recordMap.toString());
}
outKey.set(recordMap.get(sumField));
outValue.set("1");
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(outKey, outValue);
}
}
3、Reducer
package com.kangaroo.hadoop.reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
@SuppressWarnings("rawtypes")
public class AggregateReducer extends Reducer<Text, Text, Text, Text> {
protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class);
HCatSchema schema;
Text outKey;
Text outValue;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
schema = HCatInputFormat.getTableSchema(context.getConfiguration());
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
outKey.set(key);
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
outValue.set(String.valueOf(sum));
}
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(outKey, outValue);
}
}
4、propertyUtil
package com.kangaroo.hadoop.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertiesUtil {
private String filePath;
public PropertiesUtil() {
this.filePath = "configure.properties";
}
public PropertiesUtil(String filePath) {
this.filePath = filePath;
}
public Properties getProperties() throws IOException {
Properties prop;
InputStream inStream = null;
try {
inStream = PropertiesUtil.class.getClassLoader()
.getResourceAsStream(this.filePath);
prop = new Properties();
prop.load(inStream);
return prop;
} finally {
if (inStream != null)
inStream.close();
}
}
}
5、配置
mapred.job.queue.name=root.XXX
mapred.jar=./XXX.jar
mapred.map.tasks=300
mapred.reduce.tasks=100
#mapred.map.capacity=1
#mapred.reduce.capacity=1
mapred.job.priority=HIGH
mapred.job.name=XXX
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- php 使用expat方式解析xml文件操作示例
- laravel利用中间件做防非法登录和权限控制示例
- laravel框架中表单请求类型和CSRF防护实例分析
- Yii框架getter与setter方法功能与用法分析
- laravel框架中视图的基本使用方法分析
- laravel5 Eloquent 实现事务方式
- Laravel 微信小程序后端搭建步骤详解
- Laravel使用swoole实现websocket主动消息推送的方法介绍
- Laravel框架Eloquent ORM删除数据操作示例
- PHP常用函数之base64图片上传功能详解
- laravel-admin 实现在指定的相册下添加照片
- Laravel框架Eloquent ORM修改数据操作示例
- PHP常用函数之格式化时间操作示例
- 在phpstudy集成环境下的nginx服务器下配置url重写
- 关于laravel-admin ueditor 集成并解决刷新的问题