1. 准备工作 1.1 确认 Mysql binlog 模式 确认 mysql 数据库的 binlog 模式是否为 ROW。可以在 Mysql 命令行中执行以下语句确认: SHOW GLOBAL VARIABLES LIKE 'binlog
确认 mysql 数据库的 binlog 模式是否为 ROW
。可以在 Mysql 命令行中执行以下语句确认:
SHOW GLOBAL VARIABLES LIKE 'binlog_fORMat';
如果返回结果中的 Value
字段为 ROW
,则说明 binlog 模式为 ROW
。
下载并安装 Flink,可以参考官方文档进行安装。
在 Flink 的配置文件 flink-conf.yaml
中添加 MySQL 数据库连接信息,例如:
# MySQL connection configurationmysql.server-id: 12345mysql.hostname: localhostmysql.port: 3306mysql.username: rootmysql.passWord: 123456mysql.database-name: test
在 Flink 的 CDC Job 配置文件 mysql-cdc.properties
中添加以下配置:
# Flink CDC Job Configurationname: mysql-cdc-jobflink.parallelism: 1flink.checkpoint.interval: 60000flink.checkpoint.mode: EXACTLY_ONCE# MySQL CDC Source Configurationdebezium.transforms: unwrapdebezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordStatedatabase.hostname: localhostdatabase.port: 3306database.user: rootdatabase.password: 123456database.history.kafka.bootstrap.servers: localhost:9092database.history.kafka.topic: mysql-cdc-historydatabase.server.id: 12345database.server.name: testdatabase.whitelist: test.user
其中,name
为 CDC Job 的名称,flink.parallelism
为 Flink 的并行度,flink.checkpoint.interval
为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode
为 Checkpoint 模式,此处设置为 EXACTLY_ONCE
。
debezium.transforms
为 Debezium 转换器的名称,此处设置为 unwrap
。database.hostname
、database.port
、database.user
、database.password
分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.servers
为 Kafka 的地址信息,database.history.kafka.topic
为 CDC 历史数据记录的 Kafka Topic。database.server.id
为 MySQL 的 Server ID,database.server.name
为 CDC Source 的名称,database.whitelist
为需要进行同步的 MySQL 表的名称。
首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db
的数据库以及名为 flink_cdc_user
的用户的示例 SQL 代码:
CREATE DATABASE test_db;CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';
启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh
脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。
在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table
的表以及与之关联的 CDC 表
CREATE TABLE test_db.test_table ( id INT PRIMARY KEY, name VARCHAR(30), age INT, email VARCHAR(50)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE test_db.test_table_cdc ( `database` VARCHAR(100), `table` VARCHAR(100), `type` VARCHAR(10), `ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3), `before` JSON, `after` jsON) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc
库和 flink-connector-kafka
库来实现此目的。
以下是一个基本的 Flink CDC 应用程序的代码示例:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test-group"); JdbcSource source = JdbcSource.builder() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/test_db") .setUsername("flink_cdc_user") .setPassword("password") .setQuery("SELECT id, name, age, email FROM test_table") .setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING)) .setFetchSize(1000) .build(); DataStream stream = env.addSource(source);
以下是一个简单的示例运行及结果:
$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
[INFO] Starting CDC process for table: mytable.[INFO] Initializing CDC source...[INFO] CDC source successfully initialized.[INFO] Starting CDC source...[INFO] CDC source successfully started.[INFO] Adding CDC source to Flink job topology...[INFO] CDC source successfully added to Flink job topology.[INFO] Starting Flink job...[INFO] Flink job started successfully.[INFO] Change data for table: mytable.[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.[INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.[INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.[INFO] Change data for table: mytable.[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.
可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。
来源地址:https://blog.csdn.net/lhyandlwl/article/details/129998737
--结束END--
本文标题: Flink CDC 最佳实践(以 MySQL 为例)
本文链接: https://lsjlt.com/news/410257.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0