Flink 面试题

时间:2022-07-22
本文章向大家介绍Flink 面试题,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

基础概念考察

Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。

Flink 的特性包括:

  1. 架构模型

Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager 和 Slot。

  1. 任务调度

Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图 DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。

Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager 进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。

  1. 时间机制

Spark Streaming 支持的时间机制有限,只支持处理时间。Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。

  1. 容错机制

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

Flink 则使用两阶段提交协议来解决这个问题。

自下而上,每一层分别代表:Deploy 层:该层主要涉及了 Flink 的部署模式,在上图中我们可以看出,Flink 支持包括 local、Standalone、Cluster、Cloud 等多种部署模式。Runtime 层:Runtime 层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph 到 ExecutionGraph 的映射、调度等等,为上层 API 层提供基础服务。API 层:API 层主要实现了面向流(Stream)处理和批(Batch)处理 API,其中面向流处理对应 DataStream API,面向批处理对应 DataSet API,后续版本,Flink 有计划将 DataStream 和 DataSet API 进行统一。Libraries 层:该层称为 Flink 应用框架层,根据 API 层的划分,在 API 层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于 SQL-like 的操作(基于 Table 的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。

四、Flink集群的角色

TaskManager,JobManager,Client 三种角色。其中 JobManager 扮演着集群中的管理者 Master 的角色,它是整个集群的协调者,负责接收 Flink Job,协调检查点,Failover 故障恢复等,同时管理 Flink 集群中从节点 TaskManager。

TaskManager 是实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task,每个 TaskManager 负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向 JobManager 汇报。

Client 是 Flink 程序提交的客户端,当用户提交一个 Flink 程序时,会首先创建一个 Client,该 Client 首先会对用户提交的 Flink 程序进行预处理,并提交到 Flink 集群中处理,所以 Client 需要从用户提交的 Flink 程序配置中获取 JobManager 的地址,并建立到 JobManager 的连接,将 Flink Job 提交给 JobManager。

五、Task Slot 进行资源管理

TaskManager 是实际负责执行计算的 Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个 task 或多个 subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。

简单的说,TaskManager 会将自己节点上管理的资源分为不同的 Slot:固定大小的资源子集。这样就避免了不同 Job 的 Task 互相竞争内存资源,但是需要主要的是,Slot 只会做内存的隔离。没有做 CPU 的隔离。

GlobalPartitioner数据会被分发到下游算子的第一个实例中进行处理。

ShufflePartitioner数据会被随机分发到下游算子的每一个实例中进行处理。

RebalancePartitioner数据会被循环发送到下游的每一个实例中进行处理。

RescalePartitioner这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为 2,编号为 A 和 B。下游并行度为 4,编号为 1,2,3,4。那么 A 则把数据循环发送给 1 和 2,B 则把数据循环发送给 3 和 4。假设上游并行度为 4,编号为 A,B,C,D。下游并行度为 2,编号为 1,2。那么 A 和 B 则把数据发送给 1,C 和 D 则把数据发送给 2。

BroadcastPartitioner广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做 Jion 的场景。

ForwardPartitionerForwardPartitioner 用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner 用来做数据的控制台打印。

KeyGroupStreamPartitionerHash 分区器。会将数据按 Key 的 Hash 值输出到下游算子实例中。

CustomPartitionerWrapper用户自定义分区器。需要用户自己实现 Partitioner

接口,来定义自己的分区逻辑

我们在实际生产环境中可以从四个不同层面设置并行度:

操作算子层面(Operator Level)

.map(new RollingAdditionMapper()).setParallelism(10) 将操作算子设置并行度

执行环境层面(Execution Environment Level)

$FLINK_HOME/bin/flink 的-p参数修改并行度

客户端层面(Client Level)

env.setParallelism(10)

系统层面(System Level)

全局配置在flink-conf.yaml文件中,parallelism.default,默认是1:可以设置默认值大一点

需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。

每个算子的一个并行度实例就是一个subtask-在这里为了区分暂时叫做substask。那么,带来很多问题,由于flink的taskmanager运行task的时候是每个task采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。为了减轻这种情况,flink进行了优化,也即对subtask进行链式操作,链式操作结束之后得到的task,再作为一个调度执行单元,放到一个线程里执行。

这样做的好处主要有以下几点:

1.Flink 集群所需的taskslots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。

2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将基线的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks,比如keyby/window/apply操作就会均分到申请的所有slot里,这样slot的负载就均衡了。

parallelism 是指 taskmanager 实际使用的并发能力。

slot 是指 taskmanager 的并发执行能力.

Flink 实现了多种重启策略。

固定延迟重启策略(Fixed Delay Restart Strategy) 故障率重启策略(Failure Rate Restart Strategy) 没有重启策略(No Restart Strategy) Fallback 重启策略(Fallback Restart Strategy)

Flink 实现的分布式缓存和 Hadoop 有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点中,防止 task 重复拉取。

val env = ExecutionEnvironment.getExecutionEnvironment// register a file from HDFSenv.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")// register a local executable file (script, executable, ...)env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)// define your program and execute...val input: DataSet[String] = ...val result: DataSet[Integer] = input.map(new MyMapper())...env.execute()

Flink 是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。那么 Flink 中的广播变量就是为了解决这种情况。

我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个 dataset 数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。

Flink中的窗口

Flink 支持两种划分窗口的方式,按照 time 和 count。如果根据时间划分窗口,那么它就是一个 time-window 如果根据数据划分窗口,那么它就是一个 count-window。

flink 支持窗口的两个重要属性(size 和 interval)

如果 size=interval,那么就会形成 tumbling-window(无重叠数据)如果 size>interval,那么就会形成 sliding-window(有重叠数据)

通过组合可以得出四种基本窗口:

time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5)) time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3)) count-tumbling-window 无重叠数据的数量窗口,设置方式举例:countWindow(5) count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

Flink中的状态存储

Flink 在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。

Flink 提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。

Flink中的时间窗口

Flink 中的时间和其他流式计算系统的时间一样分为三类:事件时间,摄入时间,处理时间三种。

如果以 EventTime 为基准来定义时间窗口将形成 EventTimeWindow,要求消息本身就应该携带 EventTime。如果以 IngesingtTime 为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的 systemTime 为准。如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的 systemTime 为准。

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。一般来讲 Watermark 经常和 Window 一起被用来处理乱序事件。

通过watermark机制来处理out-of-order的问题,属于第一层防护,属于全局性的防护,通常说的乱序问题的解决办法,就是指这类;

通过窗口上的allowedLateness机制来处理out-of-order的问题,属于第二层防护,属于特定window operator的防护,late element的问题就是指这类。

TableEnvironment 是 Table API 和 SQL 集成的核心概念。

这个类主要用来:

在内部 catalog 中注册表 注册外部 catalog 执行 SQL 查询 注册用户定义(标量,表或聚合)函数 将 DataStream 或 DataSet 转换为表 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

一次完整的 SQL 解析过程如下:

用户使用对外提供 Stream SQL 的语法开发业务应用 用 calcite 对 StreamSQL 进行语法检验,语法检验通过后,转换成 calcite 的逻辑树节点;最终形成 calcite 的逻辑计划 采用 Flink 自定义的优化规则和 calcite 火山模型、启发式模型共同对逻辑树进行优化,生成最优的 Flink 物理计划 对物理计划采用 janino codegen 生成代码,生成用低阶 API DataStream 描述的流应用,提交到 Flink 平台执行

Flink 的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了 DataSet API 和 DataStream API。

在一个 Flink Job 中,数据需要在不同的 task 中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲 buffer 中收集 records,然后再发送。Records 并不是一个一个被发送的,是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。

Flink 实现容错主要靠强大的 CheckPoint 机制和 State 机制。Checkpoint 负责定时制作分布式快照、对程序中的状态进行备份;State 用来存储计算过程中的中间状态。

Flink 的分布式快照是根据 Chandy-Lamport 算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。

核心思想是在 input source 端插入 barrier,控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。

Flink 通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤:

开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面 预提交(preCommit)将内存中缓存的数据写入文件并关闭 正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟 丢弃(abort)丢弃临时文件 若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。

Flimk 如何做内存管理

Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink 大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。

理论上 Flink 的内存管理分为三部分:

Network Buffers:这个是在 TaskManager 启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是 32K,默认分配 2048 个,可以通过“taskmanager.network.numberOfBuffers”修改 Memory Manage pool:大量的 Memory Segment 块,用于运行时的算法(Sort/Join/Shuffle 等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和 lazy load,默认懒加载的方式。 User Code,这部分是除了 Memory Manager 之外的内存用于 User code 和 TaskManager 本身的数据结构。

link 摒弃了 Java 原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。

TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。TypeInformation 支持以下几种类型:

BasicTypeInfo: 任意 Java 基本类型或 String 类型 BasicArrayTypeInfo: 任意 Java 基本类型数组或 String 数组 WritableTypeInfo: 任意 Hadoop Writable 接口的实现类 TupleTypeInfo: 任意的 Flink Tuple 类型(支持 Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的 Java Tuple 实现 CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples) PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java 对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法 GenericTypeInfo: 任意无法匹配之前几种类型的类

Flink中window 出现数据倾斜怎么解决?

window 产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:

在数据进入窗口前做预聚合 重新设计窗口聚合的 key

在 Flink 的后台任务管理中,我们可以看到 Flink 的哪个算子和 task 出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的 Operator 的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State 的设置,checkpoint 的设置。

Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink 的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。

Operator Chains(算子链)这个概念你了解吗

为了更高效地分布式执行,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。

两个 operator chain 在一起的的条件:

上下游的并行度一致 下游节点的入度为 1 (也就是说下游节点没有来自其他节点的输入) 上下游节点都在同一个 slot group 中(下面会解释 slot group) 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS) 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD) 两个节点间数据分区方式是 forward(参考理解数据流的分区) 用户没有禁用 chain

用户提交的 Flink Job 会被转化成一个 DAG 任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink 中 JobManager 与 TaskManager,JobManager 与 Client 的交互是基于 Akka 工具包的,是通过消息驱动。整个 Flink Job 的提交还包含着 ActorSystem 的创建,JobManager 的启动,TaskManager 的启动和注册。

一个 Flink 任务的 DAG 生成计算图大致经历以下三个过程:

StreamGraph最接近代码所表达的逻辑层面的计算拓扑结构,按照用户代码的执行顺序向 StreamExecutionEnvironment 添加 StreamTransformation 构成流式图。 JobGraph从 StreamGraph 生成,将可以串联合并的节点进行合并,设置节点之间的边,安排资源共享 slot 槽位和放置相关联的节点,上传任务所需的文件,设置检查点配置等。相当于经过部分初始化和优化处理的任务图。 ExecutionGraph由 JobGraph 转换而来,包含了任务具体执行所需的内容,是最贴近底层实现的执行图。

JobManager 在集群中扮演角色

JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。

JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。

JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。

同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。

当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。

JobManager 在集群中起什么作用?

JobManager 的职责主要是接收 Flink 作业,调度 Task,收集作业状态和管理 TaskManager。它包含一个 Actor,并且做如下操作:

RegisterTaskManager: 它由想要注册到 JobManager 的 TaskManager 发送。注册成功会通过 AcknowledgeRegistration 消息进行 Ack。 SubmitJob: 由提交作业到系统的 Client 发送。提交的信息是 JobGraph 形式的作业描述信息。 CancelJob: 请求取消指定 id 的作业。成功会返回 CancellationSuccess,否则返回 CancellationFailure。 UpdateTaskExecutionState: 由 TaskManager 发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回 true,否则返回 false。 RequestNextInputSplit: TaskManager 上的 Task 请求下一个输入 split,成功则返回 NextInputSplit,否则返回 null。 JobStatusChanged: 它意味着作业的状态(RUNNING, CANCELING, FINISHED,等)发生变化。这个消息由 ExecutionGraph 发送。

TaskManager 在集群中扮演的角色

TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。

客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager 节点,然后启动并运行任务。

TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。

可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。

TaskManager 在集群启动过程中起到什么作用?

TaskManager 的启动流程较为简单:启动类:org.apache.flink.runtime.taskmanager.TaskManager核心启动方法 : selectNetworkInterfaceAndRunTaskManager启动后直接向 JobManager 注册自己,注册完成后,进行部分模块的初始化

TaskManager 中最细粒度的资源是 Task slot,代表了一个固定大小的资源子集,每个 TaskManager 会将其所占有的资源平分给它的 slot。

通过调整 task slot 的数量,用户可以定义 task 之间是如何相互隔离的。每个 TaskManager 有一个 slot,也就意味着每个 task 运行在独立的 JVM 中。每个 TaskManager 有多个 slot 的话,也就是说多个 task 运行在同一个 JVM 中。

而在同一个 JVM 进程中的 task,可以共享 TCP 连接(基于多路复用)和心跳消息,可以减少数据的网络传输,也能共享一些数据结构,一定程度上减少了每个 task 的消耗。 每个 slot 可以接受单个 task,也可以接受多个连续 task 组成的 pipeline,如下图所示,FlatMap 函数占用一个 taskslot,而 key Agg 函数和 sink 函数共用一个 taskslot

Flink 为了避免 JVM 的固有缺陷例如 java 对象存储密度低,FGC 影响吞吐和响应等,实现了自主管理内存。MemorySegment 就是 Flink 的内存抽象。默认情况下,一个 MemorySegment 可以被看做是一个 32kb 大的内存块的抽象。这块内存既可以是 JVM 里的一个 byte[],也可以是堆外内存(DirectByteBuffer)。

在 MemorySegment 这个抽象之上,Flink 在数据从 operator 内的数据对象在向 TaskManager 上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是 Buffer。

对接从 Java 对象转为 Buffer 的中间对象是另一个抽象 StreamRecord。

Flink 的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。 这些快照充当一致性 checkpoint,系统可以在发生故障时回滚。 Flink 用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。 它受到分布式快照的标准 Chandy-Lamport 算法的启发,专门针对 Flink 的执行模型而定制。

barriers 在数据流源处被注入并行数据流中。快照 n 的 barriers 被插入的位置(我们称之为 Sn)是快照所包含的数据在数据源中最大位置。例如,在 Apache Kafka 中,此位置将是分区中最后一条记录的偏移量。 将该位置 Sn 报告给 checkpoint 协调器(Flink 的 JobManager)。

然后 barriers 向下游流动。当一个中间操作算子从其所有输入流中收到快照 n 的 barriers 时,它会为快照 n 发出 barriers 进入其所有输出流中。 一旦 sink 操作算子(流式 DAG 的末端)从其所有输入流接收到 barriers n,它就向 checkpoint 协调器确认快照 n 完成。在所有 sink 确认快照后,意味快照着已完成。

一旦完成快照 n,job 将永远不再向数据源请求 Sn 之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。

FlinkSQL 的是如何实现的?

构建抽象语法树的事情交给了 Calcite 去做。SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的抽象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的抽象语法树。然后依次被转换成逻辑执行计划和物理执行计划。

在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。

时区问题解决方案比较多吧,要想不伤筋动骨,主要介绍以下三种:

flink端不做处理。也即是在读取数据的时候加上8小时的offset。

使用udf等算子给时间戳加上8小时的offset。

sink内部做处理。

public class UTC2Local extends ScalarFunction { 
    public Timestamp eval(Timestamp s) {    
        long timestamp = s.getTime() + 28800000;    
        return new Timestamp(timestamp);    
    }   
    
}

tEnv.registerFunction("utc2local",new UTC2Local());

Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");