flinkCDC - 功能验证记录 flink 与cdc 版本使用搭配:flink cdc参数说明原理分析(DBLog)无锁算法论文 mysql cdccdc api 动态加表flink
flink1.13.6 + flink Mysql cdc 1.4.0
flink 1.16.0 + flink mysql cdc 2.3.0
flink 1.16.0 + flink mysql cdc 2.4.0
flink 1.16.0 + flink postgresql cdc 2.3.0
flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题
调整chunck大小 : scan.incremental.snapshot.chunk.size
2、设置cdc模式:scan.startup.mode【initial(默认)、latest-offset】
3、支持chunk key 列设置,默认是第一个字段:scan.incremental.snapshot.chunk.key-column
官网:https://ververica.GitHub.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
cdc mysql 全量快照阶段split sql :SELECT * FROM cdc_db
.tablename
WHERE id >= ? AND NOT (id = ?) AND id <= ?;
备注:id 是主键id
链接地址:Https://arxiv.org/pdf/2010.12597.pdf , 对此算法感兴趣的可以看这位大佬的分享:https://zhuanlan.zhihu.com/p/600303844
论文部分摘要理解:
全量阶段:
1、flink cdc 任务启动后按设置的chunk size切分数据,sql如下:
(sql:SELECT * FROM cdc_db
.tablename
WHERE id >= ? AND NOT (id = ?) AND id <= ?; )
2、同时会启动读取binlog任务,读取chunk对应的binlog,通过binlog对 select chunk的数据做合并操作,此操作是合并在期间执行了update、delete操作,保证insert-only
增量阶段:
1、不断追加数据
启动任务,复制checkpoint路径
2、新增监听的表到tableList(可以使用同一个jar包,在外部传参动态加表)
3、从checkpoint初重启任务即可
cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s
cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s
若更新字段包含(部分)主键字段,会先发送一条删除之前主键的记录。op = d , after = null ; 然后再发送一条新主键记录,op = c,且before = null 。
2、若仅更新非主键主键,只会发送一条记录,op = u , before = null。
主体代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 必须开启 checkpoint ,因为Flink Postgres CDC 只会在 checkpoint 完成的时候更新 Postgres slot 中的 LSN,否则磁盘使用率会一直很高 env.enableCheckpointing(1000); //监听 postgresql wal 日志 DebeziumSourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder() .hostname(host) .port(port) .username(userName) .passWord(passWord) .database(dbName) .tableList(tableList) .deserializer(new JSONDebeziumDeserializationSchema()) .slotName(slotName) .build(); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction); dataStreamSource.print(">>>").setParallelism(1); env.execute();
AT_LEAST_ONCE 模型要配置 acks = 1
登录mysql并查询当前时区:show variables like “%time_zone%”;
执行以下命令修改时区:
set global time_zone = '+8:00'; ##修改mysql全局时区为北京时间,即我们所在的东8区set time_zone = '+8:00'; ##修改当前会话时区flush privileges; #立即生效
缺包,引入 debezium-connector-mysql-1.6.4.Final.jar包会报Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig,看 flink cdc 社区群反馈可能是
删除pom.xml文件flink-connector-mysql-cdc依赖报下的provided
包冲突原因。注释或删除jar:flink-table-planner-loader-1.16.0.jar
加大心跳间隔时间,默认是30s,‘heartbeat.interval’ = ‘60s’
来源地址:https://blog.csdn.net/a123147abc/article/details/131301937
--结束END--
本文标题: flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法
本文链接: https://lsjlt.com/news/429436.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0