返回顶部
首页 > 资讯 > 数据库 >canal 使用详解
  • 174
分享到

canal 使用详解

mysql数据库CanalPoweredby金山文档 2023-09-04 05:09:29 174人浏览 薄情痞子
摘要

第1章 Canal 简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 Mysql 数据库增量日志解析,提供增量数据订阅和消费 工作原理 canal 模拟 mysq

第1章 Canal 简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 Mysql 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 mysql slave 的交互协议,伪装自己为 Mysql slave ,向 MySQL master 发送 dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

第2章 Canal 快速开始

环境准备

主机环境:windows 11

数据库版本:MySQL-8数据库

Canal版本:canal.deployer-1.1.6

MySQL8准备

(1)检查MySQL 的binlog功能是否有开启

-- 是否开启binlogshow VARIABLES like 'log_bin';

(2)如果显示状态为OFF表示该功能未开启,开启binlog功能

1,修改 mysql 的配置文件 my.cnf**my.cnf末尾追加内容:#binlog文件名log-bin=mysql-bin#选择row模式binlog_fORMat=ROW#mysql实例id,不能和canal的slaveId重复server_id=12,windows 重启 mysql

(3)在mysql里面添加以下的相关用户和权限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';GRANT SHOW VIEW, SELECT, REPLICATioN SLAVE, REPLICATION CLIENT ON *.* TO'canal'@'%';FLUSH PRIVILEGES;

Windows 安装Canal

下载地址

Canal git 地址:https://github.com/alibaba/canal/releases

解压及配置

解压canal.deployer-1.1.6.tar.gz,我们可以看到里面有四个文件夹:

Canal 启动配置

(1)打开配置文件conf/example/instance.properties

################################################### v1.0.26版本后会自动生成slaveId,所以可以不用配置# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# 数据库地址canal.instance.master.address=127.0.0.1:3306# binlog日志名称canal.instance.master.journal.name=mysql-bin.000001# binlog偏移量canal.instance.master.position=913canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binloGCanal.instance.rds.accesskey=canal.instance.rds.secreTKEy=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassWord=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# MQ configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.enableDynamicQueuePartition=false#canal.mq.partitionsNum=3#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

知识点拓展:查看MySQL 的binlog日志名称 和binlog 偏移量

# 查看当前服务器使用的biglog文件及大小show binary logs;# 查看最新一个binlog日志文件名称和Positionshow master status;# 查看 binlog 日志列表show master logs;

(2)Canal 启动

切换至Canal项目bin 文件夹(D:\Canal\canal.deployer-1.1.6\bin),双击启动startup.bat

(3)查看Canal server 日志

切换至Canal项目logs/canal 文件夹

查看logs/canal/canal.log 日志内容

2023-02-06 15:45:55.188 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler2023-02-06 15:45:55.193 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations2023-02-06 15:45:55.198 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.2023-02-06 15:45:55.358 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.43.80(192.168.43.80):11111]2023-02-06 15:45:56.260 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

(4)查看Canal instance 日志

切换至Canal项目logs/example文件夹

2023-02-06 17:06:18.146 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2023-02-06 17:06:18.148 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$2023-02-06 17:06:18.148 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$2023-02-06 17:06:18.148 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....2023-02-06 17:06:18.201 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2023-02-06 17:06:18.316 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000001:4:16756662120002023-02-06 17:06:18.809 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=,timestamp=1675666212000] cost : 608ms , the next step is binlog dump2023-02-06 17:06:33.829 [MultiStageCoprocessor-other-example-0] WARN  com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:1572023-02-06 17:06:48.835 [MultiStageCoprocessor-other-example-0] WARN  com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157

(5)Canal 停止

直接关闭Canal 服务运行窗口即可。

第3章 Docker 安装Canal

第一步:查看本地镜像、检查Canal镜像和下载Canal 镜像

# 查看本地镜像docker images# 检索kafka镜像docker search canal# 下载Kafka 镜像指定版本docker pull canal/canal-server:latest

第二步:docker 启动Canal

docker run --name canal -d canal/canal-server

知识点拓展:拷贝Canal 容器内部配置文件拷贝到外部

语法:docker cp [容器索引]:[内部路径] [外部路径]

实例:

docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canaldocker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal

第三步:修改Canal 配置文件instance.properties

# 编辑配置文件vi  /home/canal/instance.properties

编辑内容:

################################################### v1.0.26版本后会自动生成slaveId,所以可以不用配置# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# 数据库地址canal.instance.master.address=127.0.0.1:3306# binlog日志名称canal.instance.master.journal.name=mysql-bin.000001# binlog偏移量canal.instance.master.position=913canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.enableDynamicQueuePartition=false#canal.mq.partitionsNum=3#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

第四步:基于Canal 外部配置文件,重新Canal 容器实例

docker run --name canal -p 11111:11111 -d -v /home/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server

可选指令:

  • 关闭Canal 容器

docker stop canal
  • 移除Canal 容器

docker rm canal

第4章 基于Canal 和Kafka,实现MySQL的Binlog 近实时同步

搭建一套可以用的组件需要部署MySQL、ZooKeeper、Kafka和Canal四个中间件的实例。

Docker 安装MySQL

请参考:Docker 安装MySQL

Centos-7安装ZooKeeper

Canal和Kafka集群都依赖于Zookeeper做服务协调,为了方便管理,一般会独立部署Zookeeper服务或者Zookeeper集群。

midkr /data/zk# 创建数据目录midkr /data/zk/datacd /data/zkwget Http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gztar -zxvf apache-zookeeper-3.6.0-bin.tar.gzcd apache-zookeeper-3.6.0-bin/confcp zoo_sample.cfg zoo.cfg && vim zoo.cfg

把zoo.cfg文件中的dataDir设置为/data/zk/data,然后启动Zookeeper:

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfgStarting zookeeper ... STARTED

注意一点,要启动此版本的Zookeeper服务必须本地安装好jdk8+。启动的默认端口是2181,启动成功后的日志如下:

CentOS-7 安装Kafka

Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper。笔者在此选用2.4.0并且Scala版本为2.13的安装包:

mkdir /data/kafkamkdir /data/kafka/datawget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgztar -zxvf kafka_2.13-2.4.0.tgz

解压后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中对应的zookeeper.connect=localhost:2181已经符合需要,不必修改,需要修改日志文件的存放目录log.dirs为/data/kafka/data。然后启动Kafka服务:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties

知识拓展:kafka 后台进行运行设置

Kafka启动后一旦退出控制台就会结束Kafka进程,可以添加-daemon参数用于控制Kafka进程后台不挂断运行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties

CentOS-7 安装Canal

CentOS 安装Canal 核心步骤

mkdir /data/canalcd /data/canal# 这里注意一点,GitHub在国内被墙,下载速度极慢,可以先用其他下载工具下载完再上传到服务器中wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gztar -zxvf canal.deployer-1.1.6.tar.gz

Canal 解压后目录说明:

- bin   # 运维脚本- conf  # 配置文件  canal_local.properties  # canal本地配置,一般不需要动  canal.properties        # canal服务配置  logback.xml             # logback日志配置  metrics                 # 度量统计配置  spring                  # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件  example                 # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹    instance.properties   # 实例配置,一般指单个数据库的配置- lib   # 服务依赖包- logs  # 日志文件输出目录

开发测试环境建议把logback.xml的日志级别修改为DEBUG方便定位问题。这里需要关注canal.properties和instance.properties两个配置文件。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。

  • canal.serverMode配置项指定为kafka,可选值有tcp、kafka和RocketMQ(master分支或者最新的的v1.1.5-alpha-1版本,可以选用RabbitMQ),默认是kafka。

  • canal.mq.servers配置需要指定为Kafka服务或者集群Broker的地址,这里配置为127.0.0.1:9092

canal.mq.servers在不同的canal.serverMode有不同的意义。kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.serversrocketmq模式下,指NameServer列表rabbitmq模式下,指RabbitMQ服务的Host和Port

本文Kafka实例配置

找到canal.deployer-1.1.6/conf目录下的canal.properties配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式canal.serverMode = kafka# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况canal.instance.parser.parallelThreadSize = 16# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录下要有example同名的目录,可以配置多个canal.destinations = example

其他配置项可以参考下面两个官方Wiki的链接:

instance.properties一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析。instance.properties需要修改的配置项主要包括:

  • canal.instance.mysql.slaveId需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为654321。

  • 配置数据源实例,包括地址、用户、密码和目标数据库:

  • canal.instance.master.address,这里指定为127.0.0.1:3306。

  • canal.instance.dbUsername,这里指定为canal。

  • canal.instance.dbPassword,这里指定为QWqw12!@。

  • 新增canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个test数据库,见前面的流程)。

  • Kafka相关配置,这里暂时使用静态topic和单个partition:

  • canal.mq.topic,这里指定为test,也就是解析完的binlog结构化数据会发送到Kafka的命名为testtopic

  • canal.mq.partition,这里指定为0。

本文MySQL8实例配置

配置instance,找到/conf/example/instance.properties配置文件:

## mySQL ServerId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopiC#单队列模式的分区下标canal.mq.partition=0

配置工作做好之后,可以启动Canal服务:

sh /data/canal/bin/startup.sh # 查看服务日志tail -100f /data/canal/logs/canal/canal# 查看实例日志  -- 一般情况下,关注实例日志即可tail -100f /data/canal/logs/example/example.log

启动正常后,见实例日志如下:

数据演示

在test数据库创建一个订单表

use `test`;CREATE TABLE `order`(    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',    order_id    VARCHAR(64)    NOT NULL COMMENT '订单ID',    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',    UNIQUE uniq_order_id (`order_id`)) COMMENT '订单表';INSERT INTO `order`(order_id, amount) VALUES ('20230207093012', 1999);UPDATE `order` SET amount = 2000 WHERE order_id = '20230207093012';DELETE  FROM `order` WHERE order_id = '20230207093012';

利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test

第5章 基于Canal 和RabbitMQ,实现MySQL的Binlog 近实时同步

Docker 安装MySQL

请参考:Docker 安装MySQL

Docker 安装RabbitMQ

请参考:Docker 安装RabbitMQ

RabbitMQ 增加交换机和队列

  • 添加交换机 canal_exchange

  • 添加队列 canal_queue

  • 队列绑定交换机

CentOS-7 安装Canal

centos-7 解压安装Canal与上 一章节一致,顾不在做详细描述***。本章节重点讲解Canal 配置RabbitMQ 参数配置。

Canal Server配置

需要配置的东西就两项,一个是监听数据库配置,另一个是 RabbitMQ 连接配置。

instance.properties

监听数据库配置

cd /example 目录下

canal.properties

配置 Canal 服务方式为 RabbitMQ 和连接配置

进入到conf文件,打开canal.properties

serverMode(服务模式)修改为rabbitMQ,默认TCP.

RabbitMQ 服务相关参数设置。

第6章 Canal api

快速开始

第一步:Maven 添加相关jar包依赖

    com.alibaba.otter    canal.client    1.1.0

第二步:编写main方法测试

import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {    // 创建链接    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),    11111), "example", "", "");    int batchSize = 1000;    int emptyCount = 0;    try {        connector.connect();        connector.subscribe(".*\\..*");        connector.rollback();        int totalEmptyCount = 120;        while (emptyCount < totalEmptyCount) {            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据            long batchId = message.getId();            int size = message.getEntries().size();            if (batchId == -1 || size == 0) {                emptyCount++;                System.out.println("empty count : " + emptyCount);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                }            } else {                emptyCount = 0;                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);                printEntry(message.getEntries());            }            connector.ack(batchId); // 提交确认            // connector.rollback(batchId); // 处理失败, 回滚数据        }        System.out.println("empty too many times, exit");    } finally {        connector.disconnect();    }}private static void printEntry(List entrys) {    for (Entry entry : entrys) {        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {            continue;        }        RowChange rowChage = null;        try {            rowChage = RowChange.parseFrom(entry.getStoreValue());        } catch (Exception e) {            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),           e);        }        EventType eventType = rowChage.getEventType();        System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",             entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),             entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),             eventType));        for (RowData rowData : rowChage.getRowDatasList()) {            if (eventType == EventType.DELETE) {                printColumn(rowData.getBeforeColumnsList());            } else if (eventType == EventType.INSERT) {                printColumn(rowData.getAfterColumnsList());            } else {                System.out.println("-------> before");                printColumn(rowData.getBeforeColumnsList());                System.out.println("-------> after");                printColumn(rowData.getAfterColumnsList());            }        }    }}private static void printColumn(List columns) {    for (Column column : columns) {        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());    }}}

第三步:启动Canal 服务

第四步: 运行Canal 客户端的main 方法,控制台输出如下信息:

empty count : 1empty count : 2empty count : 3empty count : 4

含义:数据库无变更记录。

第五步:模拟数据库变更操作

mysql> use test;Database changedmysql> CREATE TABLE `xdual` (    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,    ->   PRIMARY KEY (`ID`)    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;Query OK, 0 rows affected (0.06 sec)mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

再次查看Canal 客户端,控制台输出信息:

empty count : 1empty count : 2empty count : 3empty count : 4================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERTID : 4    update=trueX : 2013-02-05 23:29:46    update=true

Canal API 文档说明

Canal 类设计

大致分为几部分:

  • ClientIdentity
    canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)

  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalnodeAccessStrategy进行failover控制

  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.

javadoc查看:

Canal server/client交互协议

具体的网络协议格式,可参见:CanalProtocol.proto

Canal get/ack/rollback协议

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,可参见下面的数据介绍

  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
    a. 拿够batchSize条记录或者超过timeout时间
    b. timeout=0,阻塞等到足够的batchSize

  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作

  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)

  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性

  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取

  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cursor

  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api带来的异步响应模型:

数据对象格式简单介绍:EntryProtocol.proto

Entry      Header          logfileName [binlog文件名]          logfileOffset [binlog position]          executeTime [binlog里记录变更发生的时间戳,精确到秒]          schemaName           tableName          eventType [insert/update/delete类型]      entryType   [事务头BEGIN/事务尾END/数据ROWDATA]      storeValue  [byte数据,可展开,对应的类型为RowChange]  RowChangeisDdl       [是否是ddl变更操作,比如create table/drop table]sql         [具体的ddl sql]rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]beforeColumns [Column类型的数组,变更前的数据字段]afterColumns [Column类型的数组,变更后的数据字段]ColumnindexsqlType     [jdbc type]name        [column name]isKey       [是否为主键]updated     [是否发生过变更]isNull      [值是否为null]value       [具体的内容,注意为string文本]

说明:

  • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全

  • 可以提供ddl的变更语句

  • insert只有after columns, delete只有before columns,而update则会有before / after columns数据.

快速开始代码剖析

1. 创建Connector

a. 创建SimpleCanalConnector (直连ip,不支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");

b. 创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");

c. 创建ClusterCanalConnector (基于固定canal server的地址,支持固定的server ip的failover机制,不支持client的failover机制

CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");

2. get/ack/rollback使用

3. RowData数据处理

第7章 Canal 适配器

基本说明

canal 1.1.1版本之后,提供了适配器功能,可将canal server的数据直接输出到目的地,不需要用户编写客户端。

温馨提示:特殊功能需求,还需要用户编写客户端实现

适配器整体结构

client-adapter分为适配器和启动器两部分,每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载。

启动器为 SpringBoot 项目, 支持canal-client启动的同时提供相关REST管理接口, 运行目录结构为:

- bin    restart.sh    startup.bat    startup.sh    stop.sh- lib   ...- plugin     client-adapter.logger-1.1.1-jar-with-dependencies.jar    client-adapter.HBase-1.1.1-jar-with-dependencies.jar    ...- conf    application.yml    - hbase        mytest_person2.yml- logs

以上目录结构最终会打包成 canal-adapter-*.tar.gz 压缩包

源码结构解析

launcher:启动器

logger:日志适配器

rdb:支持jdbc的关系型数据库适配器(mysql、oracle、postgress、sqlserver等)

hbase:hbase适配器

kudu:kudu适配器

Canal适配器之启动

Canal 适配器启动之配置文件application.yml

server:  port: 8081spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: non_nullcanal.conf:  mode: tcp #tcp kafka rocketMQ rabbitMQ   #客户端消费模式,对应下面的consumerProperties  flatMessage: true                        #是否以JSON字符串传递数据,仅对mq生效  zookeeperHosts:                          #canal server集群部署时,创建curator客户端                                           #tcp mode需要在consumerProperties tcp中设置  syncBatchSize: 1000                      #每次同步的批数量  retries: -1  #重试次数,-1为无限次  timeout:  accessKey:  secretKey:  consumerProperties:    # canal tcp consumer                   #canal adapter连接的canal server    canal.tcp.server.host: 127.0.0.1:11111    canal.tcp.zookeeper.hosts:    canal.tcp.batch.size: 500    canal.tcp.username:    canal.tcp.password:    # kafka consumer                      #canal adapter连接的kafka    kafka.bootstrap.servers: 127.0.0.1:9092    kafka.enable.auto.commit: false    kafka.auto.commit.interval.ms: 1000    kafka.auto.offset.reset: latest    kafka.request.timeout.ms: 40000    kafka.session.timeout.ms: 30000    kafka.isolation.level: read_committed    kafka.max.poll.records: 1000    # rocketMQ consumer                  #canal adapter连接的rocketmq    rocketmq.namespace:    rocketmq.namesrv.addr: 127.0.0.1:9876    rocketmq.batch.size: 1000    rocketmq.enable.message.trace: false    rocketmq.customized.trace.topic:    rocketmq.access.channel:    rocketmq.subscribe.filter:    # rabbitMQ consumer                     #canal adapter连接的rabbitmq    rabbitmq.host:    rabbitmq.virtual.host:    rabbitmq.username:    rabbitmq.password:    rabbitmq.resource.ownerId:#  srcDataSources:#    defaultDS:#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true#      username: root#      password: 121212  canalAdapters:  - instance: example # canal instance Name or mq topic name                      # 注意:instance name、topic name不支持通配符匹配    groups:    - groupId: g1      #一份数据可被多个groupId消费                       #不同groupId并发执行,                       #同一groupId内的adapters顺序执行      outerAdapters:      - name: logger   #输出到日志#      - name: rdb     #输出到rdb(关系型数据库#        key: mysql1   #输出到mysql数据库#        properties:#          jdbc.driverClassName: com.mysql.jdbc.Driver#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true#          jdbc.username: root#          jdbc.password: 121212#          druid.stat.enable: false#          druid.stat.slowSqlMillis: 1000#      - name: rdb#        key: oracle1      #输出到oracle数据库#        properties:#          jdbc.driverClassName: oracle.jdbc.OracleDriver#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE#          jdbc.username: mytest#          jdbc.password: m121212#      - name: rdb#        key: postgres1     #输出到postgress数据库#        properties:#          jdbc.driverClassName: org.postgresql.Driver#          jdbc.url: jdbc:postgresql://localhost:5432/postgres#          jdbc.username: postgres#          jdbc.password: 121212#          threads: 1#          commitSize: 3000#      - name: hbase          #输出到hbase#        properties:#          hbase.zookeeper.quorum: 127.0.0.1#          hbase.zookeeper.property.clientPort: 2181#          zookeeper.znode.parent: /hbase#      - name: es            #输出到es#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode#        properties:#          mode: transport # or rest#          # security.auth: test:123456 #  only used for rest mode#          cluster.name: elasticsearch#      - name: kudu       #输出到kudu#        key: kudu#        properties:#          kudu.master.address: 127.0.0.1 # ',' split multi address#      - name: phoenix     #输出到phoenix#        key: phoenix#        properties:#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db#          jdbc.username:#          jdbc.password:

Canal适配器实例

业务需求:商品表新增商品数据实时同步ES搜索引擎。

解决方案:基于Canal监听MySQL-binlog 日志信息变化,通过Canal-Adapter 读取Canal 数据变更记录,同时写入ES搜索引擎。

1、软件版本

MySQL:8.xcanal:1.1.6adapter:1.1.6elasticsearch:7.4.2

2、MySQL开启binlog

[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# 是否开启binlogshow variables like 'log_bin';# 结果log_bin    ON# binlog模式show variables like 'binlog_format';# 结果binlog_format    ROW
# 创建用户canal及密码设置CREATE USER canal IDENTIFIED BY 'canal';  # 赋权GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新生效FLUSH PRIVILEGES;

3、下载canal及adapter

# canal-serverhttps://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz# canal-adapterhttps://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz

4、编辑canal配置文件

vi conf/example/instance.properties

此处只展示修改的配置

# 伪装成从库的slaveId,不能与MySQL重复canal.instance.mysql.slaveId=1234# 数据库的ip:端口canal.instance.master.address=127.0.0.1:3306# 数据库用户名密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canal

5、启动canal

cd binsh startup.sh

如遇到如下报错

OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.The stack size specified is too small, Specify at least 384kError: Could not create the Java Virtual Machine.Error: A fatal exception has occurred. Program will exit.

解决办法:调整startup.sh脚本的-Xss参数

vi bin/startup.sh
# 我这里调整到-Xss512k
if [ -n "$str" ]; then
JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss512k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatinGoccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "

6、编辑adapter配置文件

cd confvi application.yml

此处只展示修改的配置

canal.conf:  consumerProperties:      # 单机配置属性      # canal.tcp.server.host: 127.0.0.1:11111  # 此配置数据库信息与canal-server配置的数据库信息相同  srcDataSources:      defaultDs:        url: jbdc:mysql://127.0.0.1:3306/canal_test        username: canal        password: canal  # 配置 ES信息  canalAdapters:    groups:      outerAdapters:           - name: logger          - name: es7            hosts: http://127.0.0.1:9200            properties:                mode: rest                security.auth: es账号:es密码                cluster.name: es的名字

7、编辑es7 索引配置文件

cd conf/es7/cp mytest_user.yml canal_test_order.ymlrm biz_order.yml customer.yml mytest_user.ymlvi canal_test_order.ymldataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:  _index: canal_test_order  _id: _id# 这个必须要加,源文件没有  _type: _doc  upsert: true#  pk: id  sql: "select        a.id as _id,        a.order_no as orderNo,        a.order_name as orderName        from t_order a"#  objFields:#    _labels: array:;  etlCondition: "where a.c_time>={}"  commitBatch: 3000

8、启动Canal adapter

cd binsh startup.sh

查看Adapter 日志记录出现如下错误时:

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource    at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_322]    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_322]    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_322]    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_322]    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]

解决办法,更改源码

下载canal-adapter源码

修改client-adapter/escore/pom.xml为

            com.alibaba            druid        改成                    com.alibaba            druid            provided        

重新打包编译生成client-adapter.es7x-1.1.5-jar-with-dependencies.jar

放入canal-adapter的plugin目录下,替换原jar

重新启动,Canal Adapter 日志如下

9、测试

在MySQL手动插入一条数据

adapter.log打印日志如下

2023-02-09 15:20:25.519 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"}2023-02-09 15:20:25.520 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"} Affected indexes: canal_test_order 

查看elasticsearch数据

# get 127.0.0.1:9200/canal_test_order/_search{    "took": 0,    "timed_out": false,    "_shards": {        "total": 1,        "successful": 1,        "skipped": 0,        "failed": 0    },    "hits": {        "total": {            "value": 1,            "relation": "eq"        },        "max_score": 1.0,        "hits": [            {                "_index": "canal_test_order",                "_type": "_doc",                "_id": "1",                "_score": 1.0,                "_source": {                    "orderNo": 111111,                    "orderName": "11111"                }            }        ]    }}

第8章 Canal 管理平台搭建

背景

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WEBUI操作界面,方便更多用户快速和安全的操作。

准备

canal-admin的限定依赖:

  1. MySQL,用于存储配置和节点等相关数据

  1. canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

部署

  1. 下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.6 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.admin-1.1.6.tar.gz
  1. 解压缩

mkdir /tmp/canal-admintar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x   6 agapple  staff   204B  8 31 15:37 bindrwxr-xr-x   8 agapple  staff   272B  8 31 15:37 confdrwxr-xr-x  90 agapple  staff   3.0K  8 31 15:37 libdrwxr-xr-x   2 agapple  staff    68B  8 31 15:26 logs
  1. 配置修改

vi conf/application.yml
server:  port: 8089spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8spring.datasource:  address: 127.0.0.1:3306  database: canal_manager  username: canal  password: canal  driver-class-name: com.mysql.jdbc.Driver  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false  hikari:    maximum-pool-size: 30    minimum-idle: 1canal:  adminUser: admin  adminPasswd: admin
  1. 初始化元数据库

mysql -h127.1 -uroot -p# 导入初始化SQL> source conf/canal_manager.sql

a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下。

  1. 启动

sh bin/startup.sh
  1. 日志查看

vi logs/admin.log2023-02-09 15:43:38.162 [main] INFO  o.s.boot.web.embedded.Tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)2023-02-09 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NIOProtocol - Initializing ProtocolHandler ["http-nio-8089"]2023-02-09 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]2023-02-09 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29....2023-02-09 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler2023-02-09 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

  1. 关闭

sh bin/stop.sh
  1. canal-server端配置

使用canal_local.properties的配置覆盖canal.properties

# reGISter ipcanal.register.ip =# canal admin configcanal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# admin auto registercanal.admin.register.auto = truecanal.admin.register.cluster =

启动admin-server即可。

第9章 Canal 管理平台操作指南

请参考:Canal-Admin-指南

来源地址:https://blog.csdn.net/zhouzhiwengang/article/details/128900318

您可能感兴趣的文档:

--结束END--

本文标题: canal 使用详解

本文链接: https://lsjlt.com/news/392971.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • canal 使用详解
    第1章 Canal 简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 工作原理 canal 模拟 MySQ...
    99+
    2023-09-04
    mysql 数据库 Canal Powered by 金山文档
  • SpringBoot整合Canal方法详解
    目录pom.xml 添加 canal.client 依赖业务功能处理简单连接程序单次获取数据循环获取数据解析 storeValue 值不同的类型进行不同的处理一次性获取多条数据ack...
    99+
    2022-12-21
    SpringBoot整合Canal SpringBoot Canal
  • Cannal初体验:使用Docker安装Canal详细流程说明
    文章目录 Cannal安装步骤1.通过docker创建一个新网络2.mysql相关2.1 开启mysql主从配置1、修改配置文件2、重启mysql 2.2 新建canal用户并设置权限...
    99+
    2023-10-01
    docker spring boot mysql linux
  • mac环境canal+mysql+kafka的安装及使用
    为了实现 实时同步数据,在mac环境搭建了canal,mysql,kafka的一套流程使用canal加mysql加kafka的方式传递数据mysql 数据源头canal模仿slave冲mysql取数据。。是一个管道kafka 将canal获...
    99+
    2023-06-05
  • canal使用说明:MySQL、Redis实时数据同步
    1. canal简介 canal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等 ...
    99+
    2023-09-08
    mysql redis 数据库
  • springboot 整合canal实现示例解析
    目录前言环境准备一、快速搭建canal服务搭建步骤1、服务器使用docker快速安装一个mysql并开启binlog日志2、上传canal安装包并解压3、进入到第二步解压后的文件目录...
    99+
    2024-04-02
  • 怎么使用canal+Kafka进行数据库同步操作
    这篇文章主要介绍了怎么使用canal+Kafka进行数据库同步操作的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用canal+Kafka进行数据库同步操作文章都会有所收获,下面我们一起来看看吧。平时工作中...
    99+
    2023-06-27
  • Express.JS使用详解
    安装了node(下载)之后, 在你的机器上创建一个目录,开始你的第一个应用程序。 $ mkdir hello-world 在这个目录中你将定义应用程序“包”,这和任何其他node的包没有什么不同。文...
    99+
    2022-06-04
    详解 Express JS
  • xtrabackup使用详解
    原理:    在InnoDB内部会维护一个redo日志文件,我们也可以叫做事务日志文件。事务日志会存储每一个InnoDB表数据的记录修改。当InnoDB启动时,Inn...
    99+
    2024-04-02
  • Mysqlusing使用详解
    目录示例第一种第二种第三种示例 在平时,我们做关联表查询的时候一般是这样的 select * from 表1 inner join 表2 on 表1.相同的列=表2.相同的...
    99+
    2024-04-02
  • Docker使用详解
    Docker Docker基础篇 目标: 掌握Docker基础知识,能够理解Docker镜像与容器的概念 完成Docker安装与启动 掌握Docker镜像与容器相关命令 掌握Tomcat Ng...
    99+
    2023-09-03
    docker 运维 容器 java
  • iptables使用详解
    前言 最近买了一个VPS,并在上面搭了DOCKER,然后再DOCKER中安装Mysql。但只要将网络端口映射到宿主机上,那么外部网络就可以直接访问该数据。属实吓人。为此,我们需要使用防火墙。 说到防火...
    99+
    2023-09-07
    linux ubuntu 服务器
  • pytest使用详解
    pytest使用详解 1.pytest简介2.pytest的命名规则2.1 python的命名规则2.2 pytest的命名规则 3.pytest的运行方式3.1 主函数运行3.2 命令行...
    99+
    2023-08-31
    pytest python 测试用例
  • mshtml使用详解
    mshtml是Microsoft HTML解析器,它是由Microsoft开发的一组ActiveX控件,用于解析和操作HTML文档。...
    99+
    2023-09-13
    mshtml
  • Arthas 使用详解
    前言   Arthas 是一款线上监控诊断产品,通过全局视角实时查看应用 load、内存、gc、线程的状态信息,并能在不修改应用代码的情况下,对业务问题进行诊断,包括查看方法调用的出入参、异常,监测方法执行耗时,类加载信息等,大大提升线上问...
    99+
    2023-09-05
    arthas Arthas使用详解 Arthas详细使用 arthas 使用 arthas详解
  • SpringCloudGateway使用详解
    目录Spring Cloud Gateway使用安装配置断言过滤器熔断器总结Spring Cloud Gateway使用 Spring Cloud Gateway是一个基于Sprin...
    99+
    2023-05-16
    Java SpringCloud SpringCloud Gateway使用
  • CompletableFuture使用详解
    一、简介 1.1 概述 在上一篇文章《CompletionService使用与源码分析》中,已经介绍过了Future的局限性,它没法直接对多个任务进行链式、组合等处理,需要借助并发工具类才能完成,实现逻辑比较复杂。 而Completable...
    99+
    2023-08-16
    java Future
  • Window.ShowModalDialog使用详解
    Window.ShowModalDialog是浏览器窗口对象的一个方法,它可以在一个模态对话框中打开一个URL。模态对话框意味着在对...
    99+
    2023-08-22
    使用
  • Canal配置文件的作用是什么
    Canal配置文件的作用是指定Canal实例的相关配置信息,用于指导Canal实例的运行行为。具体作用包括: 数据源配置:配置C...
    99+
    2023-10-22
    Canal
  • 详解MySqlBulkLoader的使用
    目录一、mysqlBulkLoader的使用二、MySqlBulkLoader使用过程中出现的问题1、Mysql数据库不支持加载本地文件数据2、数据库和项目是分别放在不同服务器上mysql数据库:最近要写一个服务,跨库数...
    99+
    2022-07-14
    MySqlBulkLoader使用 MySqlBulkLoader
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作