通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase
在实际生产环境中,将计算和存储进行分离,是我们提高集群吞吐量、确保集群规模水平可扩展的主要方法之一,并且通过集群的扩容、性能的优化,确保在数据大幅增长时,存储不能称为系统的瓶颈。
具体到我们实际的项目需求中,有一个典型的场景,通常会将Hive中的部分数据,比如热数据,存入到HBase中,进行冷热分离处理。
我们采用Spark读取Hive表数据存入HBase中,这里主要有两种方式:
- 通过HBase的put API进行数据的批量写入
- 通过生成HFile文件,然后通过BulkLoad方式将数据存入HBase
HBase的原生put方式,通过HBase集群的region server向HBase插入数据,但是当数据量非常大时,region会进行split、compact等处理,并且这些处理非常占用计算资源和IO开销,影响性能和集群的稳定性。
HBase的数据最终是以HFile的形式存储到HDFS上的,如果我们能直接将数据生成为HFile文件,然后将HFile文件保存到HBase对应的表中,可以避免上述的很多问题,效率会相对更高。
本篇文章主要介绍如何使用Spark生成HFile文件,然后通过BulkLoad方式将数据导入到HBase中,并附批量put数据到HBase以及直接存入数据到HBase中的实际应用示例。
1. 生成HFile,BulkLoad导入
1.1 数据样例
{"id":"1","name":"jack","age":"18"}
{"id":"2","name":"mike","age":"19"}
{"id":"3","name":"kilos","age":"20"}
{"id":"4","name":"tom","age":"21"}
...
1.2 示例代码
/**
* @Author bigdatalearnshare
*/
object App {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkSession = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[*]")
.getOrCreate()
val rowKeyField = "id"
val df = sparkSession.read.format("json").load("/people.json")
val fields = df.columns.filterNot(_ == "id").sorted
val data = df.rdd.map { row =>
val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
val kvs = fields.map { field =>
new KeyValue(rowKey, Bytes.toBytes("hfile-fy"), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
(new ImmutableBytesWritable(rowKey), kvs)
}.flatMapValues(x => x).sortByKey()
val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf())
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile")
val connection = ConnectionFactory.createConnection(hbaseConf)
val tableName = TableName.valueOf("hfile")
//没有HBase表则创建
creteHTable(tableName, connection)
val table = connection.getTable(tableName)
try {
val regionLocator = connection.getRegionLocator(tableName)
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
val savePath = "hdfs://linux-1:9000/hfile_save"
delHdfsPath(savePath, sparkSession)
job.getConfiguration.set("mapred.output.dir", savePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(savePath), connection.getAdmin, table, regionLocator)
} finally {
//WARN LoadIncrementalHFiles: Skipping non-directory hdfs://linux-1:9000/hfile_save/_SUCCESS 不影响,直接把文件移到HBASE对应HDFS地址了
table.close()
connection.close()
}
sparkSession.stop()
}
def creteHTable(tableName: TableName, connection: Connection): Unit = {
val admin = connection.getAdmin
if (!admin.tableExists(tableName)) {
val tableDescriptor = new HTableDescriptor(tableName)
tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("hfile-fy")))
admin.createTable(tableDescriptor)
}
}
def delHdfsPath(path: String, sparkSession: SparkSession) {
val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf())
val hdfsPath = new Path(path)
if (hdfs.exists(hdfsPath)) {
//val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
hdfs.delete(hdfsPath, true)
}
}
}
1.3 注意事项
上述示例代码可以根据实际业务需求作相应调整,但有一个问题需要特别注意:
通过Spark读取过来的数据生成HFile时,要确保HBase的主键、列族、列按照有序排列。否则,会抛出以下异常:
Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 1/hfile-fy:age/1588230543677/Put/vlen=2/seqid=0, lastCell = 1/hfile-fy:name/1588230543677/Put/vlen=4/seqid=0
2. 批量put
2.1数据样例
val rowKeyField = "id"
val df = sparkSession.read.format("json").load("/stats.json")
val fields = df.columns.filterNot(_ == "id")
df.rdd.foreachPartition { partition =>
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "batch_put")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf("batch_put"))
val res = partition.map { row =>
val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
val put = new Put(rowKey)
val family = Bytes.toBytes("hfile-fy")
fields.foreach { field =>
put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
put
}.toList
Try(table.put(res)).getOrElse(table.close())
table.close()
conn.close()
}
在实际应用中,我们也可以将经常一起查询的数据拼接在一起存入一个列中,比如将上述的pv和uv拼接在一起使用,可以降低KeyValue带来的结构化开销。
3.saveAsNewAPIHadoopDataset
val hbaseConf = sparkSession.sessionState.newHadoopConf()
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "direct")
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val rowKeyField = "id"
val df = sparkSession.read.format("json").load("/stats.json")
val fields = df.columns.filterNot(_ == "id")
df.rdd.map { row =>
val put = new Put(Bytes.toBytes(row.getAs(rowKeyField).toString))
val family = Bytes.toBytes("hfile-fy")
fields.foreach { field =>
put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
(new ImmutableBytesWritable(), put)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
以上主要介绍了3种利用Spark将数据导入HBase的方式。其中,通过生成HFile文件,然后以BulkLoad导入的方式更适合于大数据量的操作。
此外,如果我们在使用Spark(或者其他计算引擎)读取HBase表数据时,如果效率相对低,比如:Spark读取HBase时会根据region的数量生成对应数量的task,导致相同数据量下,会比直接读取Hive数据慢,也可以通过直接读取HFile的方式来处理。当然,实际应用还要结合具体的场景,涉及的技术等。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 关于页更改并加入一些在线服务
- Hadoop-2.6.0为基础的Hive安装
- Python 技术篇-opencv读取中文路径图片报错及解决办法
- Javaweb鼠标事件案例分析—鼠标移入移出表格颜色变化
- docker registry V2私有仓库搭建
- Python 路径问题:cv2.error: OpenCV(4.1.0)...size.width>0 && size.height>0 in function 'cv::imshow'. 原因与解决
- 算法案例分析—字符串模式匹配算法
- Docker-软件工程集装箱技术
- PyQt5 技术篇-获取电脑屏幕桌面的宽、高和分辨率
- 使用Python快速抠图
- 值得白嫖的数据库常用操作语句汇总(数据库、数据表、数据操作)
- JavaScript 技术篇-js正则表达式匹配字符串左右两边是否包含空格
- 初入编程吓破胆?那是你还不懂这些!(高能预警)
- 振兴杯试题功能设计(准备)
- PyQt5 技术篇-Dialog窗口增加?问号按钮