一文搞懂 Flink Kafka Consumer 类两阶段提交
时间:2022-07-23
本文章向大家介绍一文搞懂 Flink Kafka Consumer 类两阶段提交,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
由 一文搞懂 checkpoint 全过程,我们可以知道当 executeCheckpointing 的时候会执行 AsyncCheckpointRunnable
@Override
public void run() {
......
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
......
}
reportCompletedSnapshotStates 最终会通过 taskStateManager.reportTaskStateSnapshots 一层层的报告给 jobMaster 的 acknowledgeCheckpoint
// ack checkpoint 说是 JobManager 确认 checkpoint 实际上还是调用了 checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
// 待所有的 ack task 执行完成,才会 notifyCHeckpointComplete 这也算是两阶段提交
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
if (checkpointCoordinator != null) {
getRpcService().execute(() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
......
}
最终调用的是 checkpointCoordinator.receiveAcknowledgeMessage 方法
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
......
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
// 待所有的 ack task 执行完成,才会 notifyCheckpointComplete 这也算是两阶段提交
// flink task 是横向的,即每个 operator chain 的所有 subTask 都 acknowCheckpoint, 这个 operator chain 才会进行 notifyCheckpointComplete
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
// 待所有的 tasks 都 ack 时,completePendingCheckpoint
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
break;
case UNKNOWN:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
......
关键是 checkpoint.acknowledgeTask 以及 checkpoint.isFullyAcknowledged() 方法。当每次调用 checkpoint.acknowledgeTask 方法时
//将本次 ack 的 task 从 notYetAcknowledgedTasks 中移除
final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
将该 executionAttemptId( 也就是 subTask id ) 从 notYetAcknowledgedTasks 中移除,而当 notYetAcknowledgedTasks 为 empty ,即全部确认,即 checkpoint.isFullyAcknowledged()==true, 进行 notifyCheckpointComplete。具体时间如何 notifyCheckpointComplete 的可以参考 一文搞懂 checkpoint 全过程
- 利用虚拟硬盘(把内存当作硬盘)来提高数据库的效率(目前只针对SQL Server 2000)可以提高很多
- 分页解决方案 之 分页算法——Pager_SQL的思路和使用方法
- 让你的笔记本更快一点——我的笔记本的性能测试和虚拟硬盘(把内存当成硬盘)的使用感觉
- 分页解决方案 之 数据访问函数库——另类的思路、另类的写法,造就了不一样的发展道路。
- 分页解决方案 之 QuickPager的使用方法(在UserControl里面使用分页控件的方法)
- 分页解决方案 之 QuickPager的使用方法(URL分页、自动获取数据)
- 分页解决方案 之 QuickPager的使用方法(PostBack分页、自定义获取数据)
- QuickPager asp.net 分页控件、表单控件等自定义控件下载 和介绍 【2009.09.07更新】
- 分页解决方案 之 QuickPager的使用方法(PostBack分页、自动获取数据)
- 【自然框架】之鼠标点功能现(二):表单控件的“应用”—— 代码?只写需要的!
- 基于Docker环境中源码部署容器Nginx
- 使用Ansible playbooks快速构建etcd集群
- 使用系统内置script和scriptreplay命令来记录操作记录
- 【机器学习】我在面试机器学习、大数据岗位时遇到的各种问题
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 【一天一大 lee】反转字符串 (难度:简单) - Day20201008
- 【一天一大 lee】四数之和 (难度:中等) - Day20201005
- 【一天一大 lee】两数相加 (难度:中等) - Day20201004
- 历经14天自定义3个注解解决项目的3个Swagger难题
- 【一天一大 lee】秋叶收藏集 (难度:中等) - Day20201001
- 【译文】Facebook工程师谈运维工作的未来
- 【一天一大 lee】环形链表II (难度:中等) - Day20201010
- 【西法带你学算法】一次搞定前缀和
- 一文快速入门分库分表中间件 Sharding-JDBC (必修课)
- 求求你别再用System.out.println 了!!
- 为什么阿里巴巴Java开发手册中强制要求超大整数禁止使用Long类型返回?
- 独家 | 教你用Python来计算偏差-方差权衡
- 使用 KinD 加速 CI/CD 流水线
- 使用 Docker 加速开发工作流
- Kubernetes CRD 自定义控制器