TableMapReduceUtil使用

时间:2019-10-24
本文章向大家介绍TableMapReduceUtil使用,主要包括TableMapReduceUtil使用使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

标签:

  今天在从文件中读取数据在写入到hbase的时候,使用到了TableMapReduceUtil工具类,使用过程只需要简单的设置之后工具类会帮我们生成写入到HBase的任务,工作类封装了许多MapReduce写入到HBase的操作,无需我们自己再去设置,下面大致看看内部的实现机制,对TableMapReduceUtil有个比较深入的了解

  使用过程:map端生成了<ImmutableBytesWritable , put>的输出类型,key和value分别为key和put对象,然后使用如下设置

1 TableMapReduceUtil.initTableReducerJob("Test_MR_HBase", // output table
2                               null,       // reducer class
3                               job);       // TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作
4 job.setNumReduceTasks(0);// 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0
5 FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入文件路径

  追踪方法的具体细节,查看具体的实现过程,发现最核心的任务设置的方法

 1 public static void initTableReducerJob(String table,
 2     Class<? extends TableReducer> reducer, Job job,
 3     Class partitioner, String quorumAddress, String serverClass,
 4     String serverImpl, boolean addDependencyJars) throws IOException {
 5 
 6     Configuration conf = job.getConfiguration();
 7     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
 8     job.setOutputFormatClass(TableOutputFormat.class);//可见输出使用的TableOutputFormat类,结果会通过网络请求到HBase集群
 9     if (reducer != null) job.setReducerClass(reducer);//设置reudce
10     conf.set(TableOutputFormat.OUTPUT_TABLE, table);//设置输出hbase的表
11     conf.setStrings("io.serializations", conf.get("io.serializations"),
12         MutationSerialization.class.getName(), ResultSerialization.class.getName());
13     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
14     if (quorumAddress != null) {
15       // Calling this will validate the format
16       ZKUtil.transformClusterKey(quorumAddress);
17       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
18     }
19     if (serverClass != null && serverImpl != null) {
20       conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
21       conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
22     }
23     job.setOutputKeyClass(ImmutableBytesWritable.class);//设置输出key的类型
24     job.setOutputValueClass(Writable.class);//输出的value类型
25     if (partitioner == HRegionPartitioner.class) {
26       job.setPartitionerClass(HRegionPartitioner.class);//设置partitioner算法,partition算法会决定map输出的结果输出到哪一个reduce上
27       int regions = MetaReader.getRegionCount(conf, table);//获得region的数量,并根据region的数量设置reduce的个数
28       if (job.getNumReduceTasks() > regions) {//将reudce的数目设置成region的个数
29         job.setNumReduceTasks(regions);
30       }
31     } else if (partitioner != null) {
32       job.setPartitionerClass(partitioner);
33     }
34 
35     if (addDependencyJars) {
36       addDependencyJars(job);
37     }
38 
39     initCredentials(job);
40   }
View Code

  值得注意的一点是工具类最终的实现是通过HBase的put方法通过网络请求提交数据,在大批量写入时需要考虑对hbase带来的负载

  driver程序如下:

 1 private int excuteHFile(String[] args) throws Exception {
 2         Configuration conf = HBaseConfiguration.create();// 任务的配置设置,configuration是一个任务的配置对象,封装了任务的配置信息
 3 
 4         Job job = Job.getInstance(conf, "HFile bulk load test");// 生成一个新的任务对象并设置dirver类
 5 
 6         job.setJarByClass(HfileToHBaseDriver.class);
 7         job.setMapperClass(HfileToHBaseMapper.class); // 设置任务的map类和 ,map类输出结果是ImmutableBytesWritable和put类型
 8 
 9         TableMapReduceUtil.initTableReducerJob("Test_MR_HBase", // output table
10 
11                 null, // reducer class
12 
13                 job);// TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作,只需要简单的设置即可
14 
15         job.setNumReduceTasks(0);// 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0
16 
17         FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入文件路径
18 
19         return (job.waitForCompletion(true) ? 0 : -1);
20     }
View Code

原文地址:https://www.cnblogs.com/ninglinglong/p/11732706.html