Flink,Dinky踩坑日记
1. Flink使用
v 初始化配置
Dlinky初始化需要使用数据库,下载包中有数据库文件(mysql),dlinky和flink存在版本问题,注意插件包中scala对应的版本序号。如果版本不对应,在执行时会报异常debzum
v MySql数据库配置:需要开启bin_log功能,先查看是否开启,on开启。
show variables like 'log_%'; -- 查看是否开启binlog
未开启的要在数据库配置文件中修改配置,修改完成后,重启mysql服务验证。
server_id=2
log_bin=mysql-bin
binlog_format=ROW
v PostGres配置
安装pgsql后,要开启远程访问,否则只允许本地访问。在pg的安装目录
D:\Program Files\PostgreSQL\14\data下,找到pg_hba.conf和文件进行修改,添加访问地址,修改完成即可远程访问。
host all all 0.0.0.0/0 md5
在相同目录下找到postgresql.conf文件,进行修改或者放开配置
listen_addresses = '*'
wal_level = logical # 设置为 logical,允许 WAL 日志记录逻辑解码所需的信息
archive_mode = on # enables archiving; off, on, or always
archive_command = '' # command to use to archive a logfile segment
max_wal_senders = 10 # 指定 WAL 的最大并发连接数的参数,确保 max_wal_senders 至少是逻辑复制槽数的两倍。例如,如果您的数据库总共使用 10 个复制槽,则该 max_wal_senders 值必须为 20 或更大。
max_replication_slots = 10 # 确保 max_replication_slots >= 使用 WAL 的 PostgreSQL 连接器的数量加上您的数据库使用的其他复制槽的数量
wal_keep_size = 10 # in megabytes; 0 disables
max_slot_wal_keep_size = 8 # in megabytes; -1 disables
wal_sender_timeout = 60s # in milliseconds; 0 disables
设置数据发布权限,所有表
select * from pg_publication;
update pg_publication set puballtables=true where pubname is not null;
select * from pg_publication_tables;
CREATE PUBLICATION dbz_publication FOR ALL TABLES
v Flink配置和启动
Flink启动前,要对配置文件进行修改
jobmanager.rpc.address: 192.168.198.12 #作业管理器远程访问地址
taskmanager.numberOfTaskSlots:6 #设置可用槽数量
v Dinky配置和启动
Dinky配置,安装或者解压完成后,要在dlink/config/文件夹下找到application.yml 配置文件修改数据库连接配置,也可以选择修改服务端口信息,修改完成后在dlink/包下使用命令进行操作:sh auto.sh [start/stop/restart/status]
v Flink任务
使用Dlinky创建任务时,每种数据库对应的配置都不同。
Pgsql:
WITH (
'connector' = 'postgres-cdc',
'hostname' = '172.30.96.179',
'port' = '5432',
'username' = 'postgres',
'password' = '123456',
'database-name' = 'pferp',
'schema-name' = 'public',
'table-name' = 'pub_entry_customer',
'debezium.plugin.name'='pgoutput',
'slot.name'='pub_entry_customer_slot',
'debezium.publication.autocreate.mode'='filtered'
);
Mysql:
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.198.128:3306/datacenter',
'table-name' = 'customer_entry_info',
'username'='root',
'password'='123456'
);
Oracle:
WITH (
'connector' = 'oracle-cdc',
'hostname' = '192.168.1.1',
'port' = '1521',
'username' = 'username',
'password' = ''username'',
'database-name' = 'INCATEST',
'schema-name' = 'NEWTEST',
'table-name' = 'PUB_ENTRY_CUSTOMER',
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true',
'debezium.database.tablename.case.insensitive'='false'
);
Pgsql作为数据仓:
WITH(
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://172.30.1.1:5432/database,
'table-name' = 'pub_entry_customer',
'username'='postgres',
'password'='123456'
);
当pgsql作为数据源进行复制时,要开启复制槽功能。可以使用sql语句先创建复制槽, 再在任务代码中指定复制槽(复制槽不能超过配置文件中配置的max_replication_slots数量)
ALTER ROLE test REPLICATION; --赋予指定用户流复制权限
使用输出插件plugin创建一个名为 slot_name的新逻辑(解码)复制槽。创建时需要指定逻辑复制插槽名称和输出插件:
SELECT pg_create_logical_replication_slot('slot_name', 'test_decoding'); -- 使用 test_decoding 输出插件
SELECT pg_create_logical_replication_slot('slot_name', 'pgoutput'); -- 使用 pgoutput 输出插件。
如果运行任务时,提示复制槽已经存在,使用删除复制槽语句删除复制槽
select pg_drop_replication_slot('复制槽名');
在pgsql数据库中,表字段日期时间类型使用TIMESTAMP,否则任务执行过程中可能会报错,提示int类型无法转为datetime类型。DECIMAL类型和numeric可以共用,bigint在pg中为int8,int在pg中为int4。
编写FlinkSql时,如果字段类型不同,可以使用cast(字段名 as INT|VARCHAR)as 新字段名;
如果目标数据库和源数据库中字段类型不一致,并且目标数据库中较短时,FlinkSQL任务可能存在运行一段时间后出现异常停止,服务器Flink出现错误,此时需要修改目标数据库中字段长度后重启Flink服务,再次运行Flink任务,查看运行情况情况
2. Flink错误解决
v org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
复制槽请求批量无法满足!无法在复制槽请求超时内分配所需的复制槽
解决:在数据库中创建复制槽,指定FlinkSQL语句中复制槽名字。操作见Flink任务。
v Debezium报错,检查flink和Dlink或者引用的jar包中版本是否对应
v 当多表聚合时如果发现最后聚合数据结果集不准确,检查FlinkSql是否把数据库主键全部标识,针对联合主键问题
v Could not acquire the minimum required resources.无法分配所需要的最小资源,增加flink配置文件中taskmanager.numberOfTaskSlots值,每个TaskManager提供的任务插槽数。每个插槽运行一个并行管道。
原文地址:https://www.cnblogs.com/yeyuzhu/p/17251225.html
- 揭开ps的神秘面纱——初步认识photoshop
- 地理坐标系与投影坐标系的区别
- ExtJs学习笔记(6)_可分页的GridPanel
- PowerDesinger联系的定义及使用
- Gis链接
- TortoiseSVN文件夹及文件图标不显示解决方法 TortoiseSVN文件夹及文件图标不显示解决方法
- 地图坐标
- PowerDesigner15连接Oracle失败的解决办法
- 地图校正方法心得
- 工作流参考模型点评
- 按图索骥:SQL中数据倾斜问题的处理思路与方法
- [方法“Boolean Contains(System.Guid)”不支持转换为 SQL]的解决办法
- DataBind的一些试验
- 继承HibernateDaoSupport时遇到的问题 使用注解为HibernateDaoSupport注入sessionFa
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- CentOS 7 解决丢失 nginx.pid
- $(function(){})和$(document).ready(function(){})
- javascript中的setTimeout() 方法和clearInterval() 方法和setInterval() 方法
- matlab生成数字1-n的列向量
- JavaScript中的匿名函数及函数的闭包
- ios下Safari无法触发click事件的处理
- 禁止网页右键、复制、另存为、查看源文件等功能
- ajax知识点
- 前端特效常用代码
- VBA编写Ribbon Custom UI编辑器02——编码转换
- web开发中常用的算法和函数
- nodejs创建线程问题
- 3分钟短文 | Laravel如何改造复杂的多表联合查询,这很高效!
- matlab中类的重载简析
- Js输入验证