一文搞懂 Flink Kafka Consumer 类两阶段提交

时间:2022-07-23
本文章向大家介绍一文搞懂 Flink Kafka Consumer 类两阶段提交,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一文搞懂 checkpoint 全过程,我们可以知道当 executeCheckpointing 的时候会执行 AsyncCheckpointRunnable

@Override
		public void run() {
			......
					reportCompletedSnapshotStates(
						jobManagerTaskOperatorSubtaskStates,
						localTaskOperatorSubtaskStates,
						asyncDurationMillis);
......
		}

reportCompletedSnapshotStates 最终会通过 taskStateManager.reportTaskStateSnapshots 一层层的报告给 jobMaster 的 acknowledgeCheckpoint

// ack checkpoint 说是 JobManager 确认 checkpoint 实际上还是调用了 checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
	// 待所有的 ack task 执行完成,才会 notifyCHeckpointComplete  这也算是两阶段提交
	public void acknowledgeCheckpoint(
			final JobID jobID,
			final ExecutionAttemptID executionAttemptID,
			final long checkpointId,
			final CheckpointMetrics checkpointMetrics,
			final TaskStateSnapshot checkpointState) {

		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
		final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
			jobID,
			executionAttemptID,
			checkpointId,
			checkpointMetrics,
			checkpointState);

		if (checkpointCoordinator != null) {
			getRpcService().execute(() -> {
				try {
					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
				} catch (Throwable t) {
					log.warn("Error while processing checkpoint acknowledgement message", t);
				}
			});
		......
	}

最终调用的是 checkpointCoordinator.receiveAcknowledgeMessage 方法

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
		......
		synchronized (lock) {
			// we need to check inside the lock for being shutdown as well, otherwise we
			// get races and invalid error log messages
			if (shutdown) {
				return false;
			}

			final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

			if (checkpoint != null && !checkpoint.isDiscarded()) {

				// 待所有的 ack task 执行完成,才会 notifyCheckpointComplete  这也算是两阶段提交
				// flink task 是横向的,即每个 operator chain 的所有 subTask 都 acknowCheckpoint, 这个 operator chain 才会进行 notifyCheckpointComplete
				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
					case SUCCESS:
						LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
							checkpointId, message.getTaskExecutionId(), message.getJob());

						// 待所有的 tasks 都 ack 时,completePendingCheckpoint
						if (checkpoint.isFullyAcknowledged()) {
							completePendingCheckpoint(checkpoint);
						}
						break;
					case DUPLICATE:
						LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
						break;
					case UNKNOWN:
						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
								"because the task's execution attempt id was unknown. Discarding " +
								"the state handle to avoid lingering state.", message.getCheckpointId(),
							message.getTaskExecutionId(), message.getJob());

						discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

						break;
					case DISCARDED:
						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
							"because the pending checkpoint had been discarded. Discarding the " +
								"state handle tp avoid lingering state.",
							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

						discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
				}

				return true;
				......

关键是 checkpoint.acknowledgeTask 以及 checkpoint.isFullyAcknowledged() 方法。当每次调用 checkpoint.acknowledgeTask 方法时

//将本次 ack 的 task 从 notYetAcknowledgedTasks 中移除
			final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);

将该 executionAttemptId( 也就是 subTask id ) 从 notYetAcknowledgedTasks 中移除,而当 notYetAcknowledgedTasks 为 empty ,即全部确认,即 checkpoint.isFullyAcknowledged()==true, 进行 notifyCheckpointComplete。具体时间如何 notifyCheckpointComplete 的可以参考 一文搞懂 checkpoint 全过程