一文搞懂Flink rocksdb中的数据恢复
时间:2022-07-23
本文章向大家介绍一文搞懂Flink rocksdb中的数据恢复,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
当我们设置 rocksdb state backend 时,并且从 checkpoint 重启时,首先进入 RocksDBKeyedStateBackendBuilder 的 getRocksDBRestoreOperation 方法
// rockdb restore 入口方法
private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
if (restoreStateHandles.isEmpty()) {
return new RocksDBNoneRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(
operatorIdentifier,
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
} else {
return new RocksDBFullRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
}
}
当没有什么 state 需要恢复时,会 new RocksDBNoneRestoreOperation ,当增量做 checkpoint ,恢复的时候 new RocksDBIncrementalRestoreOperation,全量的话 RocksDBFullRestoreOperation。 这里我们以 RocksDBIncrementalRestoreOperation 为例进行分析
@Override
public RocksDBRestoreResult restore() throws Exception {
if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
return null;
}
final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
boolean isRescaling = (restoreStateHandles.size() > 1 ||
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));
if (isRescaling) {
restoreWithRescaling(restoreStateHandles);
} else {
restoreWithoutRescaling(theFirstStateHandle);
}
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
}
首先呢,最关键性的方法也就是 restore 方法,当进行 rescale 的时候会执行 restoreWithRescaling 方法,其中 restoreStateHandles 可以简单的理解为 需要 restore state 的引用
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
// Prepare for restore with rescaling
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
openDB();
}
// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
//本地的
Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
//首先呢会把 rawStateHandle 对应的 state 数据下载到 temporaryRestoreInstancePath 并且作为一个临时的 RocksDB 实例的数据目录
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
//会把临时的 rockdb 实例的数据写入到 rocksdb 中
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
// insert data to rocksdb
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the sorted order,
// we can just break here.
break;
}
iterator.next();
}
} // releases native iterator resources
}
} finally {
cleanUpPathQuietly(temporaryRestoreInstancePath);
}
}
}
主要就是把对应 state 的 sstFiles、miscFiles 下载到 临时指定的路径中,然后基于这个临时目录启动一个临时的 rockdb,然后将临时的 rockdb 中的数据导入到最终要使用的 rockdb,最后将临时的 rockdb 销毁掉。至于它为什么要两个 rockdb ,我猜测可能会为了数据一致性,万一下载数据下载一半失败了,具体也不太清楚,就感觉有点奇怪。
- Centos7.2/7.3集群安装Kubernetes 1.8.4 + Dashboard
- PsySH——PHP交互式控制台
- 如何将 Text, XML, CSV 数据文件导入 MySQL
- hpv病毒基因研究调研
- 从0到1构建美团压测工具
- sqoop数据导入总结
- GATK best practice每个步骤耗时如何?
- cordova学习一 环境搭建
- ChIP-seq实战分析
- PHP中9大缓存技术总结
- servlet中request等中文乱码问题
- Ofbiz模块加载机制即创建独立模块(脱离热部署)
- ofbiz连接mysql并创建独立数据库
- Angular+servlet java实现前后端数据交互
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 搞它!!!Linux系统上DHCP服务器的配置(理论加实验,分分钟搞定!!!)
- 搞它!!!Linux——引导、排障及修复
- 排障集锦:九九八十一难之第一难!linux发现交换文件无法打开文件!
- 排障集锦:九九八十一难之第三难!搭建DNS错误:Host xxxx not found: 2(SERVFAIL)
- 搞它!!!linux远程控制 openssh
- 搞它!!!Linux构建远程YUM仓库与NFS共享存储服务
- 排障集锦:九九八十一难之第四难! yum下载软件发现已存在的 RPM 数据库问题,无法下载,
- 搞它!!!深入了解DNS域名解析服务,教你搭建一个属于自己的DNS服务器(正向解析、反向解析、泛域名解析、邮件交换解析、别名解析、分离解析,主从结构解析)
- Nginx Ingress 高并发实践
- 搞它!!!2020年了,你还不会PXE+kickstart 一键式部署安装系统么
- shell脚本快速入门系列—————— shell脚本编程规范
- shell脚本快速入门系列之------条件语句(if、case)
- 搞它!!!2020年了你还不会Cobbler自动装机么(装机步骤,优化内容详解,导入系统镜像步骤,cobbler-web管理认证方式
- 搞它!!!深入了解FTP文件传输服务
- 搞它!!!CentOS 7.6 安装和配置samba文件共享服务