Too old resource version 引起 Flink JobManager 崩溃的问题定位
问题背景
近期接到客户反馈,某地域的作业不定期的出现 JobManager 崩溃重启的问题。具体现象如下:
JobManager 在正常运行中,没有任何预兆地,突然报too old resource version
错误,紧接着容器就自动退出了:
2020-10-17 14:51:36.289 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 37 (type=CHECKPOINT) @ 1602917496275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:51:36.442 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 37 for job 7848c696590291978aa8279b3c09e5d7 (6958828 bytes in 167 ms).
2020-10-17 14:56:36.287 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 38 (type=CHECKPOINT) @ 1602917796275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:56:36.434 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 38 for job 7848c696590291978aa8279b3c09e5d7 (6943747 bytes in 159 ms).
2020-10-17 15:01:36.286 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39 (type=CHECKPOINT) @ 1602918096275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:01:36.461 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39 for job 7848c696590291978aa8279b3c09e5d7 (6929939 bytes in 185 ms).
2020-10-17 15:06:36.289 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 40 (type=CHECKPOINT) @ 1602918396275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:06:36.434 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 40 for job 7848c696590291978aa8279b3c09e5d7 (6916258 bytes in 159 ms).
2020-10-17 15:11:36.290 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 41 (type=CHECKPOINT) @ 1602918696275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:11:36.434 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 41 for job 7848c696590291978aa8279b3c09e5d7 (6960383 bytes in 159 ms).
2020-10-17 15:16:36.288 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 42 (type=CHECKPOINT) @ 1602918996275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:16:36.469 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 42 for job 7848c696590291978aa8279b3c09e5d7 (6969143 bytes in 194 ms).
2020-10-17 15:21:36.290 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 43 (type=CHECKPOINT) @ 1602919296275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:21:36.438 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 43 for job 7848c696590291978aa8279b3c09e5d7 (6982908 bytes in 162 ms).
2020-10-17 15:25:20.649 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.kubernetes.KubernetesResourceManager [] - Fatal error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.650 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.661 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
而查看所有 TaskManager 的日志,也没有发现任何异样。
该问题会触发 ResourceManager 对 JobManager 的重新初始化过程,作业也会从最近的一次 Checkpoint 恢复。但是如果没有配置 HA(High Availability,高可用)时,Flink 就无法正常恢复作业,造成运行中关键状态的丢失,这对线上业务是无法接受的。
原因排查
由于日志中关键的内容不多,我们决定先从 too old resource version 这个 Kubernetes 客户端的报错入手,分析异常发生的原因。
由于 Flink 的 Kubernetes 客户端使用 Fabric8,我们查到了其团队成员针对此问题的回复,简单概括如下:
每个 Kubernetes 资源都有自己的版本号,当客户端对 Pods 进行 watch(监听)操作时,有概率会出现
410 Gone
的 HTTP 状态码。这个状态表代表客户端记录的资源版本号(resourceVersion)太低,服务端不再接受它的请求。此时对于普通 watch 而言,需要自行处理该场景,客户端并没有对此做处理。
而 Flink 并没有妥善处理这种场景,而是粗暴地令 JobManager 关闭(随后会重新启动一个新的实例)来应对任何 KubernetesClientException 异常(详见 FLINK-15836)。
对于这个设计而言,固然有其合理成分,即遇到异常时 Fail Fast(尽快暴露问题),不至于小问题越拖越大。
但是我们认为,对于这种资源版本不够新的问题,并不属于故障,因此也不需要重启 JobManager 这么重的操作,只需要重新初始化一次 watcher,令其资源版本更新到最新即可。
毕竟,这种可恢复的异常,可能会在一个长期运行作业的运行周期内多次出现,平台方需要考虑到细粒度的容错,令客户的作业能够长期平稳运行。
不过至此我们仍然好奇,究竟是什么原因造成的客户端与服务端的资源版本不一致呢?
触发条件
对于这个 Too old resource version(HTTP Gone 410)的问题,我们已经知道了它的含义和潜在的应对策略,但是仍然需要找到触发条件以便对修复方案进行验证。对此我们尝试了不少方案,例如主动令 JobManager 的 JVM 较长时间停顿等等,但是难以触发同样的现象。
后来我们偶然间发现,重启 API Server 服务可以复现该问题,因为新启动的 API Server 会从 etcd 中获取当前最新 resourceVersion,如果客户端后续用保存的旧值请求的话,该现象就可以得到稳定复现,这给我们的修复和验证工作提供了极大的便利。
解决方案
有了上面的分析,可以在上述提到的onClose
方法中,对异常cause
的类型进行判定:如果它的错误码code
等于 410(HTTP Gone),则令 KubernetesResourceManager 走重新初始化 watch 的流程:
/**
* When “Fatal error occurred in ResourceManager.
* io.fabric8.kubernetes.client.KubernetesClientException:
* too old resource version”
* happens, re-watch the pods
*/
public void reWatchPods() {
podsWatch = kubeClient.watchPodsAndDoCallback(
KubernetesUtils.getTaskManagerLabels(clusterId),
this);
}
这个方法会令 Flink 的kubeClient
从服务器中获取当前最新的 pods 信息,然后重新注册 watch 回调,开启新的一轮监听。
容错效果
应用了上述修复方案后,重新制作 Flink 镜像并进行验证,可以看到 Too old resource version 问题得到复现,异常也被捕获并重新进行了 pod watcher 的初始化,JobManager 正常运行,没有发生崩溃等现象:
同时对该作业进行多次重启 API Server 操作,均可正常应对,Checkpoint 和 Savepoint 也可以继续进行。我们还在作业运行期间模拟单个和多个 TaskManager Pod 崩溃的场景,也可以正常地重新分配新的 Pod 并自动恢复作业,说明 Kubernetes Client 与服务端的后续通信都是正常的。
思考总结
回顾该问题的定位过程,其实不算特别顺利,大多数时间用在了思考和尝试如何复现问题方面。找到了复现方案,就可以把一个偶现的问题变成必现的问题,从而可以细致地从整个调用链分析问题的成因、后果、解决方案等方方面面。
另外此问题也显示了 Flink 的 Kubernetes 模块远非完美,仍然需要大家积极的发现、定位并解决各种运行时问题,为社区的发展贡献自己的力量。
- 2761: [JLOI2011]不重复数字(平衡树)
- 无图片字体icon
- 数据结构(三):栈与队列
- 3555: [Ctsc2014]企鹅QQ
- 3381: [Usaco2004 Open]Cave Cows 2 洞穴里的牛之二
- 3097: Hash Killer I
- 3390: [Usaco2004 Dec]Bad Cowtractors牛的报复
- 1684: [Usaco2005 Oct]Close Encounter
- 算法模板——Dinic最小费用最大流
- 算法模板——Dinic网络最大流 1
- SQL Server 使用全文索引进行页面搜索
- 2764: [JLOI2011]基因补全
- 1000: A+B Problem(NetWork Flow)
- 博弈论进阶之Multi-SG
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Kubernetes K8S之存储ConfigMap详解 通过目录创建通过文件创建通过命令行创建通过yaml文件创建当前存在的ConfigMap使用ConfigMap
- Material Components——Shape的处理
- pandas系列 - (一)明细数据汇总简单场景应用
- Spring系列 SpringMVC的请求与数据响应
- PHP代码审计03之实例化任意对象漏洞
- 最简单入门深度学习
- Redis 字典结构细谈
- 终于弄明白 i = i++和 i = ++i 了
- 更简易的机器学习-pycaret的安装和环境初始化
- 直观讲解一下 RPC 调用和 HTTP 调用的区别!
- pycaret之训练模型(创建模型、比较模型、微调模型)
- 什么是递归,通过这篇文章,让你彻底搞懂递归
- pycaret之集成模型(集成模型、混合模型、堆叠模型)
- pycaret模型分析之绘制模型结果
- pycaret模型分析