前言 最近公司要搞搜索,需要把Mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周
最近公司要搞搜索,需要把Mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……
我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。
直接上代码:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings"); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE orders (\n" + " order_id INT,\n" + " order_date TIMESTAMP(0),\n" + " customer_name STRING,\n" + " price DECIMAL(10, 5),\n" + " product_id INT,\n" + " order_status BOOLEAN,\n" + " PRIMARY KEY (order_id) NOT ENFORCED\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'passWord' = '123456',\n" + " 'database-name' = 'mydb',\n" + " 'table-name' = 'orders'\n" + " );").await(); tableEnv.executeSql("CREATE TABLE products (\n" + " id INT,\n" + " name STRING,\n" + " description STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'mydb',\n" + " 'table-name' = 'products'\n" + " );").await(); tableEnv.executeSql("CREATE TABLE enriched_orders (\n" + " order_id INT,\n" + " order_date TIMESTAMP(0),\n" + " customer_name STRING,\n" + " price DECIMAL(10, 5),\n" + " product_id INT,\n" + " order_status BOOLEAN,\n" + " product_name STRING,\n" + " product_description STRING,\n" + " PRIMARY KEY (order_id) NOT ENFORCED\n" + " ) WITH (\n" + " 'connector' = 'elasticsearch-7',\n" + " 'hosts' = 'Http://localhost:9200',\n" + " 'index' = 'enriched_orders_lhc'\n" + " );"); tableEnv.executeSql("INSERT INTO enriched_orders\n" + " SELECT o.*, p.name, p.description\n" + " FROM orders AS o\n" + " LEFT JOIN products AS p ON o.product_id = p.id"); env.execute("Mysql to ES"); }
来源地址:https://blog.csdn.net/lhcnicholas/article/details/129854091
--结束END--
本文标题: 使用Flink CDC将Mysql中的数据实时同步到ES
本文链接: https://lsjlt.com/news/388456.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0