返回顶部
首页 > 资讯 > 数据库 >Canal——数据同步
  • 241
分享到

Canal——数据同步

数据库mysqlredis后端 2023-10-02 19:10:05 241人浏览 独家记忆
摘要

1.数据同步到数据库: 在介绍方案2之前我们先来介绍一下MySQL复制的原理,如下图所示: 主服务器操作数据,并将数据写入Bin log从服务器调用I/O线程读取主服务器的Bin log,并且写入到自己的Relay log中,再调用

1.数据同步到数据库

  • 在介绍方案2之前我们先来介绍一下MySQL复制的原理,如下图所示:
    • 服务器操作数据,并将数据写入Bin log
    • 从服务器调用I/O线程读取主服务器的Bin log,并且写入到自己的Relay log中,再调用sql线程从Relay log中解析数据,从而同步到自己的数据库

  • 方案2就是:
    • 上面Mysql的整个复制流程可以总结为一句话,那就是:从服务器读取主服务器Bin log中的数据,从而同步到自己的数据库中
    • 我们方案2也是如此,就是在概念上把主服务器改为mysql,把从服务器改为Redis而已(如下图所示),当MySQL中有数据写入时,我们就解析MySQL的Bin log,然后将解析出来的数据写入到Redis中,从而达到同步的效果

  • 例如下面是一个云数据库实例分析:
    • 云数据库与本地数据库是主从关系。云数据库作为主数据库主要提供写,本地数据库作为从数据库从主数据库中读取数据
    • 本地数据库读取到数据之后,解析Bin log,然后将数据写入写入同步到Redis中,然后客户端从Redis读数据

  • 这个技术方案的难点就在于: 如何解析MySQL的Bin Log。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于binlog存在Statement/Row/Mixedlevel多种形式,分析binlog实现同步的工作量是非常大的

Canal开源技术

  • canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)
  • 开源参考地址有:https://GitHub.com/liukelin/canal_mysql_NoSQL_sync
  • 工作原理(模仿MySQL复制):
    • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    • canal解析binary log对象(原始为byte流)

  • 架构:
    • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
    • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
    • eventStore (数据存储)
    • metaManager (增量订阅&消费信息管理器)
    • server代表一个canal运行实例,对应于一个jvm
    • instance对应于一个数据队列 (1个server对应1..n个instance)
    • instance模块:

  • 大致的解析过程如下:
    • parse解析MySQL的Bin log,然后将数据放入到sink中
    • sink对数据进行过滤,加工,分发
    • store从sink中读取解析好的数据存储起来
    • 然后自己用设计代码将store中的数据同步写入Redis中就可以了
    • 其中parse/sink是框架封装好的,我们做的是store的数据读取那一步

  • 更多关于Cancl可以百度搜索
  • 下面是运行拓扑图

  • MySQL表的同步,采用责任链模式,每张表对应一个Filter 。例如zvsync中要用到的类设计如下:

  • 下面是具体化的zvsync中要用到的类 ,每当新增或者删除表时,直接进行增删就可以了

Canal的架构设计

Canal 伪装成 MySQL 从节点,mySQL master 会推送 binary log 给canal,canal读取 MySQL binlog的变更信息并生成消息,客户端订阅这些数据变更消息,处理并存储。只要开发一个 Canal客户端就可以解析出MySQL的操作,再将这些数据发送到大数据流计算处理引擎,即可以实现对 MySQL 实时处理。

我们一般使用canal时,只需要引入一个客户端,比如java类似这样:

com.alibaba.otter

canal.client

1.1.0

然后就可以订阅binlog消息了。

另外canal也支持直接把binlog消息发送到MQ,这样对多语言的支持更好一些。

canal自身帮我们做了很多事,这样我们自己写的客户端才能更简单,更专注于业务。下面就来看看canal内部的架构

说明:

server代表一个canal运行实例,对应于一个JVM

instance对应于一个数据队列

核心是instance模块,它包含:

  1. eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)

parser模块

parser模块用来订阅binlog事件,然后通过sink投递到store。parser模块底层依赖dbsync、driver模块。

eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)

  1. eventStore (数据存储)

  1. metaManager (增量订阅&消费信息管理器)

meta模块

核心接口为CanalMetaManager,实现了订阅&消费的机制,主要用于记录canal消费到的mysql binlog的位置。CanalMetaManager接口有几个实现类:

FileMixedMetaManager

MemoryMetaManager

MixedMetaManager

PeriodMixedMetaManager

ZooKeeperMetaManager

这些实现类之间有些会持有其它实现的引用来装饰自己的功能(装饰器模式),

CanalServer的两种实现方式

这两个实现代表了canal的两种应用模式,CanalServerWithNetty在canal独立部署场景发挥作用,开发者只需要实现cient,不同的应用通过canal client与canal server进行通信,canal client的请求统一由CanalServerWithNetty接受进行处理。

而通过CanalServerWithEmbeded,可以不需要独立部署canal,而是把canal嵌入到我们自己的服务里。但是这种对开发者的要求就比较高。

下面的图表示二者的关系,

CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理(4个模块 )

CanalInstance模块解析

1.CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理

  1. instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件

  1. 这幅图我们可以看出instance是怎么生成的。

CanalInstanceGenerator相当于一个工厂类,通过 destination 产生特定的 CanalInstance,它有两个实现:

ManagerCanalInstanceGenerator类,manager方式: 和你自己的内部WEB console/manager系统进行对接。(目前主要是公司内部使用)

springCanalInstanceGenerator类,spring方式:基于spring xml + properties进行定义,构建spring配置

  1. 具体使用哪个,是通过配置的 canal.properties文件
canal.instance.global.mode = spring

  1. 先来看下spring的版本实现,

  1. 源码里面给了几个默认的模板选择实现

  1. 然后部署的时候,我们可以通过在canal.properties配置文件中指定使用哪个文件:
canal.instance.global.spring.xml = classpath:spring/file-instance.xml  
  1. 几种实现不不同:
  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

这几个文件的主要区别是,metaManager 和eventParser 这两个配置有所不同,可能在内存、文件或zk进行存储。

  1. generate方法返回的是CanalInstanceWithSpring这个实现类,它继承自AbstractCanalInstance,并且实现了CanalInstance。这个类的实现只有几十行,之所以这么少是因为大部分的逻辑都已经通过spring的配置文件实现了,如下

  2. 具体类之间关系

  1. start方法和stop方法没什么可讲的,就是启停instance内部的组件。
  2. beforeStartEventParser和afterStartEventParser是eventParser启动的前置和后置操作

前者调用了startEventParserInternal,后者调用了stopEventParserInternal,

就是分别负责了CanalLogPositionManager和CanalHAController的启动停止工作。

  1. CanalLogPositionManager记录binlog最后一次解析成功位置,有不同的实现,可以保存在内存,zk等存在介质里。mysql在主从同步过程中,slave自己需要维护binlog的消费进度信息。而canal伪装成slave,因此也要维护这样的信息。
  2. CanalHAController主要是通过失败检测, 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库。它只有一个实现类HeartBeatHAController

  1. 失败转换的逻辑也很简单,定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机

  1. 心跳的逻辑在 com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.MysqlDetectingTimeTask 实现,是个定时器。

总结

总体来看,CanalInstance模块本身没有什么特别复杂的逻辑,它的核心处理都在parser、sink、store、metamanager等内部组件里

CANAL官方文档

基本说明

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

环境版本

  • 操作系统Centos release 6.6 (Final)
  • java版本: jdk1.8
  • canal 版本: 请下载最新的安装包,本文以当前v1.1.1 的canal.deployer-1.1.1.tar.gz为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

一、 安装zookeeper

参考:Zookeeper QuickStart

二、安装MQ

三、 安装canal.server

3.1 下载压缩包

到官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz

3.2 将canal.deployer 复制到固定目录并解压

mkdir -p /usr/local/canal cp canal.deployer-1.1.6.tar.gz /usr/local/canal tar -zxvf canal.deployer-1.1.6.tar.gz

3.3 配置修改参数

a. 修改instance 配置文件 vi conf/example/instance.properties

# 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/passWord,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=example # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart

b. 修改canal 配置文件vi /usr/local/canal/conf/canal.properties

# ... # 可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmq canal.serverMode = kafka # ... # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat JSON格式对象 canal.mq.flatMessage = false

mq相关参数说明 (<=1.1.4版本)

参数名

参数说明

默认值

canal.mq.servers

kafka为bootstrap.servers
rocketMQ中为nameserver列表

127.0.0.1:6667

canal.mq.retries

发送失败重试次数

0

canal.mq.batchSize

kafka为ProducerConfig.BATCH_SIZE_CONFIG
rocketMQ无意义

16384

canal.mq.maxRequestSize

kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG
rocketMQ无意义

1048576

canal.mq.lingerMs

kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200
rocketMQ无意义

1

canal.mq.bufferMemory

kafka为ProducerConfig.BUFFER_MEMORY_CONFIG
rocketMQ无意义

33554432

canal.mq.acks

kafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义

all

canal.mq.kafka.kerberos.enable

kafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义

false

canal.mq.kafka.kerberos.krb5FilePath

kafka kerberos认证
rocketMQ无意义

../conf/kerberos/krb5.conf

canal.mq.kafka.kerberos.jaasFilePath

kafka kerberos认证
rocketMQ无意义

../conf/kerberos/jaas.conf

canal.mq.producerGroup

kafka无意义
rocketMQ为ProducerGroup名

Canal-Producer

canal.mq.accessChannel

kafka无意义
rocketMQ为channel模式,如果为aliyun则配置为cloud

local

---

---

---

canal.mq.vhost=

rabbitMQ配置

canal.mq.exchange=

rabbitMQ配置

canal.mq.username=

rabbitMQ配置

canal.mq.password=

rabbitMQ配置

canal.mq.aliyunuid=

rabbitMQ配置

---

---

---

canal.mq.canalBatchSize

获取canal数据的批次大小

50

canal.mq.canalGetTimeout

获取canal数据的超时时间

100

canal.mq.parallelThreadSize

mq数据转换并行处理的并发

8

canal.mq.flatMessage

是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码

false

---

---

---

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的动态topic规则, 1.1.3版本支持

canal.mq.partition

单队列模式的分区下标,

1

canal.mq.partitionsNum

散列模式的分区数

canal.mq.partitionHash

散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

mq相关参数说明 (>=1.1.5版本)

在1.1.5版本开始,引入了MQ Connector设计,因此参数配置做了部分调整

参数名

参数说明

默认值

canal.aliyun.accessKey

阿里云ak

canal.aliyun.secreTKEy

阿里云sk

canal.aliyun.uid

阿里云uid

canal.mq.flatMessage

是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码

false

canal.mq.canalBatchSize

获取canal数据的批次大小

50

canal.mq.canalGetTimeout

获取canal数据的超时时间

100

canal.mq.accessChannel = local

是否为阿里云模式,可选值local/cloud

local

canal.mq.database.hash

是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算

true

canal.mq.send.thread.size

MQ消息发送并行度

30

canal.mq.build.thread.size

MQ消息构建并行度

8

------

-----------

-------

kafka.bootstrap.servers

kafka服务端地址

127.0.0.1:9092

kafka.acks

kafka为ProducerConfig.ACKS_CONFIG

all

kafka.compression.type

压缩类型

none

kafka.batch.size

kafka为ProducerConfig.BATCH_SIZE_CONFIG

16384

kafka.linger.ms

kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200

1

kafka.max.request.size

kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG

1048576

kafka.buffer.memory

kafka为ProducerConfig.BUFFER_MEMORY_CONFIG

33554432

kafka.max.in.flight.requests.per.connection

kafka为ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

1

kafka.retries

发送失败重试次数

0

kafka.kerberos.enable

kerberos认证

false

kafka.kerberos.krb5.file

kerberos认证

../conf/kerberos/krb5.conf

kafka.kerberos.jaas.file

kerberos认证

../conf/kerberos/jaas.conf

------

-----------

-------

rocketmq.producer.group

rocketMQ为ProducerGroup名

test

rocketmq.enable.message.trace

是否开启message trace

false

rocketmq.customized.trace.topic

message trace的topic

rocketmq.namespace

rocketmq的namespace

rocketmq.namesrv.addr

rocketmq的namesrv地址

127.0.0.1:9876

rocketmq.retry.times.when.send.failed

重试次数

0

rocketmq.vip.channel.enabled

rocketmq是否开启vip channel

false

rocketmq.tag

rocketmq的tag配置

空值

---

---

---

rabbitmq.host

rabbitMQ配置

rabbitmq.virtual.host

rabbitMQ配置

rabbitmq.exchange

rabbitMQ配置

rabbitmq.username

rabbitMQ配置

rabbitmq.password

rabbitMQ配置

rabbitmq.deliveryMode

rabbitMQ配置

---

---

---

pulsarmq.serverUrl

pulsarmq配置

pulsarmq.roleToken

pulsarmq配置

pulsarmq.topicTenantPrefix

pulsarmq配置

---

---

---

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的动态topic规则, 1.1.3版本支持

canal.mq.partition

单队列模式的分区下标,

1

canal.mq.enableDynamicQueuePartition

动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ

false

canal.mq.partitionsNum

散列模式的分区数

canal.mq.dynamicTopicPartitionNum

mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum

canal.mq.partitionHash

散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考Canal AdminGuide

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

MQ发送性能数据

1.1.5版本可以在5k~50k左右,具体可参考:Canal-MQ-Performance

阿里云RocketMQ对接参数

# 配置ak/sk canal.aliyun.accessKey = XXX canal.aliyun.secretKey = XXX # 配置topic canal.mq.accessChannel = cloud canal.mq.servers = 内网接入点 canal.mq.producerGroup = GID_**group(在后台创建) canal.mq.namespace = rocketmq实例id canal.mq.topic=(在后台创建)

kafka ssl配置参数

# canal.properties配置文件 kafka.kerberos.enable = true kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

3.4 启动

cd /usr/local/canal/ sh bin/startup.sh

3.5 查看日志

a.查看 logs/canal/canal.log

vi logs/canal/canal.log

b. 查看instance的日志:

vi logs/example/example.log

3.6 关闭

cd /usr/local/canal/ sh bin/stop.sh

3.7 MQ数据消费

canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能

Footer

© 2023 gitHub, Inc.

Footer navigation

Canal部分源码

1.通过 destination 产生特定的 CanalInstance

来源地址:https://blog.csdn.net/weixin_43241803/article/details/129562160

您可能感兴趣的文档:

--结束END--

本文标题: Canal——数据同步

本文链接: https://lsjlt.com/news/422780.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作