spark操作Hbase表
时间:2022-07-22
本文章向大家介绍spark操作Hbase表,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1. 创建conf和table
var tableName = "httpsystem_dev"
val conf= HBaseConfiguration.create()
//设置要查询的表
conf.set(TableInputFormat.INPUT_TABLE,tableName)
val table = new HTable(conf,tableName)
2. 通过SparkAPI读取数据
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConfiguration, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
返回的数据是一个ImmutableBytesWritable,和一个result组成的二元组,result就是一个列表
3. 通过扫描设置相查询数据
var scan = new Scan()
scan.addFamily(Bytes.toBytes("0"))
scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_rule_juge_id"))
scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_mal"))
scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_type"))
scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_mal"))
scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_type"))
//spark读取hbase转换rdd
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray)
hbaseConfiguration.set(TableInputFormat.SCAN, scanToString)
4. 将RDD转换为Df
//rdd返回df
var rdd = hbaseRDD.map(new org.apache.spark.api.java.function.Function[(ImmutableBytesWritable, Result), Row] {
override def call(v1: (ImmutableBytesWritable, Result)): Row = {
var result: Result = v1._2
var rowkey: String = Bytes.toString(result.getRow)
var ML_juge_type: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_type")))
var ML_rule_juge_id: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_rule_juge_id")))
var ML_juge_mal: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_mal")))
var DLCNN_juge_type: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_type")))
var DLCNN_juge_mal: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_mal")))
RowFactory.create(rowkey, ML_rule_juge_id, ML_juge_mal, ML_juge_type, DLCNN_juge_mal, DLCNN_juge_type)
}
})
//创建df
var df = sparkSession.createDataFrame(rdd, HttpParingSchema.struct)
5.数据的写入
val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")
批量写入
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log") v
al data = rdd.map(_.split("t")).map{x=>(x(0)+x(1),x(2))}
val result = data.foreachPartition{x => {
val conf= HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE,"data");
conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");
conf.set("hbase.zookeeper.property.clientPort","2181");
conf.addResource("/home/hadoop/data/lib/hbase-site.xml");
val table = new HTable(conf,"data");
table.setAutoFlush(false,false);
table.setWriteBufferSize(3*1024*1024);
x.foreach{y => { var put= new Put(Bytes.toBytes(y._1));
put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)
};
table.flushCommits}}}
6.使用Bulkload插入数据
val conf = HBaseConfiguration.create();
val tableName = "data1" val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));
(new ImmutableBytesWritable(kv.getKey),kv)}}
rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)
- 47. 访问MySql数据库实现增删改查 | 厚土Go学习笔记
- system表空间不足的问题分析(二) (r8笔记第5天)
- golang基于redis lua封装的优先级去重队列
- python基础知识——内置数据结构(元组)
- python基础知识——控制语句
- python基础知识——基本语法
- 11g主库归档自动删除的小问题分析 (r8笔记第1天)
- JavaWeb02-CSS,JS(Java真正的全栈开发)
- 数据处理——One-Hot Encoding
- JavaWeb20-文件上传;下载(Java真正的全栈开发)
- 转--每周一个GoLang设计模式之组合模式
- 简单易学的机器学习算法——Softmax Regression
- JavaWeb19-Listener ; Filter
- dataguard归档路径的问题(r7笔记第99天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP生成指定范围内的N个不重复的随机数
- 实例讲解通过PHP创建数据库
- PHP的mysqli_select_db()函数讲解
- PHP的PDO事务与自动提交
- 使用pytorch实现论文中的unet网络
- Python如何优雅删除字符列表空字符及None元素
- php语法检查的方法总结
- PHP实现浏览器格式化显示XML的方法示例
- Laravel框架基于中间件实现禁止未登录用户访问页面功能示例
- PHP的mysqli_stmt_init()函数讲解
- PHP内置函数生成随机数实例
- PHPStudy下如何为Apache安装SSL证书的方法步骤
- PHP的mysqli_thread_id()函数讲解
- thinkPHP框架中layer.js的封装与使用方法示例
- OpenCV+python实现实时目标检测功能