通过线程池,从hbase中拿数据
时间:2019-11-25
本文章向大家介绍通过线程池,从hbase中拿数据,主要包括通过线程池,从hbase中拿数据使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1.线程池类HbasePool
package com.example.demospringboothbase.common; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class HbasePool { private Logger log = Logger.getLogger(HbasePool.class); //代理类主要用于获取连接 public class HbaseProxy { private String zk; private String zknode; private Connection connection; public HbaseProxy(String zk, String zknode) { this.zk = zk; this.zknode = zknode; init(); } public void init() { Configuration entries = HBaseConfiguration.create(); entries.set("hbase.zookeeper.quorum",zk); entries.set("zookeeper.znode.parent",zknode); try { this.connection = ConnectionFactory.createConnection(entries); } catch (IOException e) { log.error("获取连接失败!"); e.printStackTrace(); } } public Connection getConnection(){ return this.connection; } public void close(){ if(this.connection !=null){ try { this.connection.close(); } catch (IOException e) { log.error("链接关闭失败~"); e.printStackTrace(); } } } } public class HbasePoolFactary extends BasePooledObjectFactory<HbaseProxy>{ private String zk; private String zknode; public HbasePoolFactary(String zk, String zknode) { this.zk = zk; this.zknode = zknode; } @Override public HbaseProxy create() throws Exception { return new HbaseProxy(this.zk,this.zknode); } @Override public PooledObject<HbaseProxy> wrap(HbaseProxy hbaseProxy) { return new DefaultPooledObject<HbaseProxy>(hbaseProxy); } @Override public void destroyObject(PooledObject<HbaseProxy> p) throws Exception { HbaseProxy object = p.getObject(); object.close(); super.destroyObject(p); } } private static HbasePool pool; //开始编写我们的单例池子 private HbasePool(){} public static HbasePool getPool(){ if(pool ==null){ pool = new HbasePool(); } return pool; } //还得写一个构造池子的单例方法。用通用的池子对象来进行构造 private GenericObjectPool<HbaseProxy> gop; public GenericObjectPool<HbaseProxy> getGop(String zk,String zknode){ if(gop ==null){ HbasePoolFactary hbasePoolFactary = new HbasePoolFactary(zk, zknode); gop = new GenericObjectPool<HbaseProxy>(hbasePoolFactary); gop.setMaxTotal(10); } return gop; } }
2.通过get来拿自己hbase中的数据
这里将逻辑类和测试类写一块了。
package com.example.demospringboothbase.serverce;
import com.alibaba.fastjson.JSON;
import com.example.demospringboothbase.common.HbasePool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class test {
//从连接池中拿链接、
private HbasePool hbasePool = HbasePool.getPool();
//客户给定一个表名、rowkey、rowkey的规则、哪些列、列的规则、列簇
//输出的结果格式如下[{},{},{}]
public List<Map> resultByRowkey(String tableName, List<String> rowkey, String rowkeyAttr,
List<String> column, String columnAttr, String columnFamily) throws Exception {
//先定义一个List
ArrayList<Map> list = new ArrayList<>();
Table table = null;
//get操作是基于表名和rowkey来进行的
ArrayList<Get> gets = new ArrayList<>();
//这里将rowkey都放到gets中
for (String rk:rowkey){
Get get = null;
if (rowkeyAttr.equals("rowkey")){
get = new Get(rk.getBytes());
}
//在这里要指定列,因为只有指定列才会按照列输出,不指定列某人输出的是全部列
if (columnAttr.equals("column")){
for(String cl:column){
get.addColumn(columnFamily.getBytes(),cl.getBytes());
}
}
gets.add(get);
}
//和hbase取的联系
GenericObjectPool<HbasePool.HbaseProxy> gop = hbasePool.getGop("server3:2181", "/hbase-unsecure");
//从连接池中拿一个连接
HbasePool.HbaseProxy hbaseProxy = gop.borrowObject();
//指定表
table = hbaseProxy.getConnection().getTable(TableName.valueOf(tableName));
Result[] results = table.get(gets);
if (results!=null){
for (Result r:results){
HashMap map = new HashMap();
while (r.advance()){
Cell current = r.current();
String q = Bytes.toString(CellUtil.cloneQualifier(current));
String p = Bytes.toString(CellUtil.cloneValue(current));
map.put(q,p);
}
String rowkey1 = Bytes.toString(r.getRow());
map.put("rowkey",rowkey1);
list.add(map);
}
}else{
return list;
}
return list;
}
//测试是否成功
public static void main(String[] args) throws Exception {
test ceshi = new test();
ArrayList<String> rowkey = new ArrayList();
ArrayList<String> colum = new ArrayList();
rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222603");
// rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222802");
rowkey.add("0001b04bf9473458af40acb4c13f1476_20111230002114");
colum.add("click");
colum.add("url");
colum.add("serch");
List<Map> maps = ceshi.resultByRowkey("sogo3", rowkey, "rowkey", colum, "colum", "oo");
System.out.println(JSON.toJSONString(maps));
}
}
输出结果:[{"serch":"福彩3d单选一注法","rank":"10","rowkey":"000080fd3eaf6b381e33868ec6459c49_20111230222603","click":"5","url":"http://www.18888.com/read-htm-tid-6069520.html"},{"serch":"淫淫网","rank":"1","rowkey":"0001b04bf9473458af40acb4c13f1476_20111230002114","click":"1","url":"http://www.244uu.com/"}]
原文地址:https://www.cnblogs.com/shiji7/p/11927583.html
- 一文看懂ovirt的supervdsmd服务
- openstack如何扩展API之二:扩展原有核心API
- selenium+python自动化77-autoit文件上传
- selenium+python自动化78-autoit参数化与批量上传
- libvirt-内存分配和内存热插拔
- selenium+python自动化79-文件下载(SendKeys)
- selenium+python自动化80-文件下载(不弹询问框)
- libvirt-cpu分配和cpu热插拔
- 如何使用curl调试openstack的api
- selenium+python自动化81-报告优化
- Selenium+python自动化82-只截某个元素的图
- libvirt-TLS加密
- 在openstck中配置使用cloud-init
- libvirt-使用cgroup做资源分割控制
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Flink SQL 写入 Hive表的性能问题
- 文件系统索引extent 和 bitmap区别
- MySQL的Bugs
- 对复制实施主键约束
- MySQL如何管理客户端的连接?
- 鸿蒙 Ability 讲解(页面生命周期、后台服务、数据访问)
- MySQL如何管理客户端连接?线程池篇
- 让python装饰器不再晦涩难懂
- Android开发3年,九月份面试12家大厂跳槽成功,我有一些面试经验想分享给你们
- MySQL的防火墙功能
- Java中线程池的参数有几个?
- MySQL企业版备份工具MEB
- python生成器函数的应用场景举例---为copy过程添加进度条显示
- 短网址程序YOURLS安装及配置教程与设置中文
- MGR用哪个版本?5.7 vs 8.0