一文搞懂Flink rocksdb中的数据恢复

时间:2022-07-23
本文章向大家介绍一文搞懂Flink rocksdb中的数据恢复,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

当我们设置 rocksdb state backend 时,并且从 checkpoint 重启时,首先进入 RocksDBKeyedStateBackendBuilder 的 getRocksDBRestoreOperation 方法

// rockdb restore 入口方法
	private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
		int keyGroupPrefixBytes,
		CloseableRegistry cancelStreamRegistry,
		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
		RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
		if (restoreStateHandles.isEmpty()) {
			return new RocksDBNoneRestoreOperation<>(
				keyGroupRange,
				keyGroupPrefixBytes,
				numberOfTransferingThreads,
				cancelStreamRegistry,
				userCodeClassLoader,
				kvStateInformation,
				keySerializerProvider,
				instanceBasePath,
				instanceRocksDBPath,
				dbOptions,
				columnFamilyOptionsFactory,
				nativeMetricOptions,
				metricGroup,
				restoreStateHandles,
				ttlCompactFiltersManager);
		}
		KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
		if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
			return new RocksDBIncrementalRestoreOperation<>(
				operatorIdentifier,
				keyGroupRange,
				keyGroupPrefixBytes,
				numberOfTransferingThreads,
				cancelStreamRegistry,
				userCodeClassLoader,
				kvStateInformation,
				keySerializerProvider,
				instanceBasePath,
				instanceRocksDBPath,
				dbOptions,
				columnFamilyOptionsFactory,
				nativeMetricOptions,
				metricGroup,
				restoreStateHandles,
				ttlCompactFiltersManager);
		} else {
			return new RocksDBFullRestoreOperation<>(
				keyGroupRange,
				keyGroupPrefixBytes,
				numberOfTransferingThreads,
				cancelStreamRegistry,
				userCodeClassLoader,
				kvStateInformation,
				keySerializerProvider,
				instanceBasePath,
				instanceRocksDBPath,
				dbOptions,
				columnFamilyOptionsFactory,
				nativeMetricOptions,
				metricGroup,
				restoreStateHandles,
				ttlCompactFiltersManager);
		}
	}

当没有什么 state 需要恢复时,会 new RocksDBNoneRestoreOperation ,当增量做 checkpoint ,恢复的时候 new RocksDBIncrementalRestoreOperation,全量的话 RocksDBFullRestoreOperation。 这里我们以 RocksDBIncrementalRestoreOperation 为例进行分析

@Override
	public RocksDBRestoreResult restore() throws Exception {

		if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
			return null;
		}

		final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();

		boolean isRescaling = (restoreStateHandles.size() > 1 ||
			!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));

		if (isRescaling) {
			restoreWithRescaling(restoreStateHandles);
		} else {
			restoreWithoutRescaling(theFirstStateHandle);
		}
		return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
			nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
	}

首先呢,最关键性的方法也就是 restore 方法,当进行 rescale 的时候会执行 restoreWithRescaling 方法,其中 restoreStateHandles 可以简单的理解为 需要 restore state 的引用

private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {

		// Prepare for restore with rescaling
		KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
			restoreStateHandles, keyGroupRange);

		// Init base DB instance
		if (initialHandle != null) {
			restoreStateHandles.remove(initialHandle);
			initDBWithRescaling(initialHandle);
		} else {
			openDB();
		}

		// Transfer remaining key-groups from temporary instance into base DB
		byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
		RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

		byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
		RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

		for (KeyedStateHandle rawStateHandle : restoreStateHandles) {

			if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
				throw new IllegalStateException("Unexpected state handle type, " +
					"expected " + IncrementalRemoteKeyedStateHandle.class +
					", but found " + rawStateHandle.getClass());
			}

			//本地的
			Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
			//首先呢会把 rawStateHandle 对应的 state 数据下载到 temporaryRestoreInstancePath 并且作为一个临时的 RocksDB 实例的数据目录
			try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
				(IncrementalRemoteKeyedStateHandle) rawStateHandle,
				temporaryRestoreInstancePath);
				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {

				List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
				List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;

				// iterating only the requested descriptors automatically skips the default column family handle
				for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
					ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);

					ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
						null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
						.columnFamilyHandle;

					//会把临时的 rockdb 实例的数据写入到 rocksdb 中
					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {

						iterator.seek(startKeyGroupPrefixBytes);

						while (iterator.isValid()) {

							if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
								// insert data to rocksdb
								writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
							} else {
								// Since the iterator will visit the record according to the sorted order,
								// we can just break here.
								break;
							}

							iterator.next();
						}
					} // releases native iterator resources
				}
			} finally {
				cleanUpPathQuietly(temporaryRestoreInstancePath);
			}
		}
	}

主要就是把对应 state 的 sstFiles、miscFiles 下载到 临时指定的路径中,然后基于这个临时目录启动一个临时的 rockdb,然后将临时的 rockdb 中的数据导入到最终要使用的 rockdb,最后将临时的 rockdb 销毁掉。至于它为什么要两个 rockdb ,我猜测可能会为了数据一致性,万一下载数据下载一半失败了,具体也不太清楚,就感觉有点奇怪。