返回顶部
首页 > 资讯 > 数据库 >如何实现从库MTS多线程并行回放
  • 769
分享到

如何实现从库MTS多线程并行回放

2024-04-02 19:04:59 769人浏览 独家记忆
摘要

今天就跟大家聊聊有关如何实现从库MTS多线程并行回放,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。从库MTS多线程并行回放重点描述一下MTS中检查点

今天就跟大家聊聊有关如何实现从库MTS多线程并行回放,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

从库MTS多线程并行回放

重点描述一下MTS中检查点的概念。在后面的第25节我们可以看到,MTS的异常恢复很多情况下需要依赖这个检查点,从检查点位置开始扫描relay log做恢复操作,但是在GTID AUTO_POSITioN MODE模式且设置了recovery_relay_log=1的情况下这种依赖将会弱化。

一、工作线程执行Event

实际上协调线程只是将Event分发到了工作线程的执行队列中。那么工作线程执行Event就需要从执行队列中拿出这些Event,然后进行执行。整个过程可以参考函数slave_worker_exec_job_group。因为这个流程比较简单,因此就不需要画图了,但是我们需要关注一些点如下:

(1)从执行队列中读取Event。注意这里如果执行队列中没有Event那么就进入空闲等待,也就是工作线程处于无事可做的状态,等待状态为‘Waiting for an event from Coordinator’。

(2)如果执行到XID_EVENT那么说明事务已经结束了那么需要完成内存信息更新操作。可参考Slave_worker::slave_worker_exec_event和Xid_apply_log_event::do_apply_event_worker函数。更新内存相关信息可参考函数commit_positions函数。下面是一些更新的信息,我们可以看到和slave_worker_info表中的信息基本一致,如下:

1、更新当前信息
strmake(group_relay_log_name, ptr_g->group_relay_log_name,
sizeof(group_relay_log_name) - 1);
group_relay_log_pos= ev->future_event_relay_log_pos;
set_group_master_log_pos(ev->common_header->log_pos);
set_group_master_log_name(c_rli->get_group_master_log_name());
2、将检查点信息进行写入:
strmake(checkpoint_relay_log_name, ptr_g-
>checkpoint_relay_log_name,sizeof(checkpoint_relay_log_name) - 1);
checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
strmake(checkpoint_master_log_name, ptr_g-
>checkpoint_log_name,sizeof(checkpoint_master_log_name) - 1);
checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
3、设置GAQ序号:
 checkpoint_seqno= ptr_g->checkpoint_seqno;
更新整个BITMAP,可能已经由检查点进行GAQ出队:
for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++) 
//重新设置位图 因为checkpoint已经 
{                                                                     
//ptr_g->shifted是GAQ中出队的事务个数
if (bitmap_is_set(&group_shifted, pos))                            
//这里就需要偏移掉出队的事务,恢复已经不需要了
bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
}
4、设置位图:
bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
//在本次事务相应的位置设置为1

(3)如果执行到XID_EVENT那么说明事务已经结束了那么需要完成内存信息的持久化,即强制刷内存信息持久化到slave_worker_info表中(relay_log_info_repository设置为TABLE)。可参考函数commit_positions函数,如下:

if ((error= w->commit_positions(this, ptr_group,
w->is_transactional())))

(4)如果执行到XID_EVENT还需要进行事务的提交操作,也就是进行Innodb层事务的提交。

从上面我们可以看到MTS中每次事务的提交并不会更新slave_relay_log_info表,而是进行slave_worker_info表的更新,将最新的信息写入到slave_worker_info表中。
我们前面也说过sql线程已经蜕变为协调线程,那么slave_relay_log_info表什么时候更新呢?下面我们就能看到slave_relay_log_info表的更新实际上由协调线程在做完检查点之后更新。

二、MTS中检查点中的重要概念

总的说来MTS中的检查点是MTS进行异常恢复的起点。实际上就是代表到这个位置之前(包含自身)事务都是已经在从库执行过了,但之后的事务可能执行完成了也可能没有执行完成。检查点由协调线程进行。

(1)协调线程的GAQ队列

前面我们已经知道MTS中为每个工作线程维护了一个Event的分发队列。除此之外协调线程还维护了一个非常的重要的队列GAQ,它是一个环形队列。下面是源码中的定义:

  
  Slave_committed_queue *gaq;

每次协调线程分发事务的时候都会将事务记录到GAQ队列中,因此GAQ中事务的顺序总是和relay log文件中事务的顺序一致的。检查点正是作用在GAQ队列上的,每次检查点的位置称为LWM,还记得上一节我叫大家先忽略的LWM吗?就是这个。源码中定义也正是如此,它在GAQ队列中进行维护。如下:

  
  Slave_job_group lwm;

在GAQ队列中还维护有一个叫做checkpoint_seqno的序号,它是最后一次检查点以来每个分配事务的序号,下面是源码中的定义:

uint checkpoint_seqno;  // counter of groups executed after the most recent CP

在协调线程读取到GTID_LOG_EVENT后为其分配序号,记做checkpoint_seqno,如下:

rli->checkpoint_seqno++;//增加seqno

当协调线程进行检查点的时候checkpoint_seqno序号会减去出队的事务数量,如下:

checkpoint_seqno= checkpoint_seqno - shift; //这里减去出队的事务

在MTS异常恢复的时候也会用到这个序号,每个工作线程会通过这个序号来确认本工作线程执行事务的上限,如下:

      for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
                 j= 0; i <= w->checkpoint_seqno; i++, j++)
            {
              if (bitmap_is_set(&w->group_executed, i))
//如果这一位 已经设置
              {
                DBUG_PRINT("mts", ("Setting bit %u.", j));
                bitmap_fast_test_and_set(groups, j);
//那么GTOUPS 这个 bitmap中应该设置,最终GTOUPS会包含全的需要恢复的事务
              }
            }

关于详细的异常恢复流程将在第25节描述。

(2)工作线程的Bitmap

有了GAQ队列和检查点就知道异常恢复开始的位置了。但是我们并不知道每一个工作线程都完成了哪些事务,哪些又没有执行完成,因此就不能确认哪些事务需要恢复。在MTS中并行回放事务的提交并不是按分发顺序的进行的,某些大事务(或者其他原因比堵塞)可能迟迟不能提交,而一些小事务却会很快提交完成。这些迟迟不能提交的事务就成为了所谓的’gap’,如果使用了GTID那么在查看已经执行GTID SET的时候可能出现一些‘空洞’,为了防止’gap’的发生通常需要设置参数slave_preserve_commit_order。下一节我们将会看到这种‘空洞’以及slave_preserve_commit_order的作用。但是如果要设置了slave_preserve_commit_order参数就需要开启从库记录binary log的功能,因此必须开启log_slave_updates参数。下面是源码的判断:

  if (opt_slave_preserve_commit_order && rli->opt_slave_parallel_workers > 0 &&
      opt_bin_log && opt_log_slave_updates)
    commit_order_mngr= new Commit_order_manager(rli->opt_slave_parallel_workers);
//order commit 管理器

这里先提前说一下MTS恢复的会有两个关键阶段:

  • 扫描阶段

通过扫描检查点以后的relay log。通过每个工作线程的Bitmap区分出哪些事务已经执行完成,哪些事务没有执行完成,并且汇总形成恢复Bitmap,同时得到需要恢复的事务总量。

  • 执行阶段

通过这个汇总的恢复Bitmap,将这些没有执行完成事务读取relay log再次执行。

这个Bitmap位图和GAQ中的事务一一对应。当执行XID_EVENT完成提交后这一位将会被设置为‘1’。

(3)协调线程信息的持久化

这个已经在前面提到过,实际上每次进行检查点的时候都需要将检查点的位置固化到slave_relay_log_info表中(relay_log_info_repository设置为TABLE)。因此slave_relay_log_info中存储的实际上不是实时的信息而是检查点的信息。下面就是slave_relay_log_info表的表结构:

Mysql> desc slave_relay_log_info;
+-------------------+---------------------+------+-----+---------+-------+
| Field             | Type                | Null | Key | Default | Extra |
+-------------------+---------------------+------+-----+---------+-------+
| Number_of_lines   | int(10) unsigned    | NO   |     | NULL    |       |
| Relay_log_name    | text                | NO   |     | NULL    |       |
| Relay_log_pos     | bigint(20) unsigned | NO   |     | NULL    |       |
| Master_log_name   | text                | NO   |     | NULL    |       |
| Master_log_pos    | bigint(20) unsigned | NO   |     | NULL    |       |
| Sql_delay         | int(11)             | NO   |     | NULL    |       |
| Number_of_workers | int(10) unsigned    | NO   |     | NULL    |       |
| Id                | int(10) unsigned    | NO   |     | NULL    |       |
| Channel_name      | char(64)            | NO   | PRI | NULL    |       |
+-------------------+---------------------+------+-----+---------+-------+

与此同时show slave status中的某些信息也是检查点的内存信息。下面的信息将是来自检查点:

  • Relay_Log_File :最新一次检查点的relay log文件名。

  • Relay_Log_Pos :最新一次检查点的relay log位点。

  • Relay_Master_Log_File:最新一次检查点的主库binary log文件名。

  • Exec_Master_Log_Pos:最新一次检查点的主库binary log位点。

  • Seconds_Behind_Master:根据检查点指向事务的提交时间计算的延迟。

需要注意的是我们的GTID模块独立在这一套理论之外,在第3节我们讲GTID模块的初始化的时候我们就说过GTID模块的初始化是在从库信息初始化之前就完成了。因此在做MTS异常恢复的时候使用GTID AUTO_POSITION MODE模式将会变得更加简单和安全,细节将在第25节描述。

(4)工作线程信息的持久化

工作线程的信息就持久化在slave_worker_info 表中,前面我们描述工作线程执行Event注意点的时候已经做了相应的描述。执行XID_EVENT完成事务提交之后会将信息写入到slave_worker_info 表中(relay_log_info_repository设置为TABLE)。其中包括信息:

  • Relay_log_name:工作线程最后一个提交事务的relay log文件名。

  • Relay_log_pos:工作线程最后一个提交事务的relay log位点。

  • Master_log_name:工作线程最后一个提交事务的主库binary log文件名。

  • Master_log_pos:工作线程最后一个提交事务的主库binary log文件位点。

  • Checkpoint_relay_log_name:工作线程最后一个提交事务对应检查点的relay log文件名。

  • Checkpoint_relay_log_pos:工作线程最后一个提交事务对应检查点的relay log位点。

  • Checkpoint_master_log_name:工作线程最后一个提交事务对应检查点的主库binary log文件名。

  • Checkpoint_master_log_pos:工作线程最后一个提交事务对应检查点的主库binary log位点。

  • Checkpoint_seqno:工作线程最后一个提交事务对应checkpoint_seqno序号。

  • Checkpoint_group_size:工作线程的Bitmap字节数,约等于 GAQ队列大小/8,因为1个字节为8位。

  • Checkpoint_group_bitmap:工作线程对应的Bitmap位图信息。

关于Checkpoint_group_size的换算参考函数Slave_worker::write_info。

(5)两个参数

  • slave_checkpoint_group:GAQ队列大小。

  • slave_checkpoint_period:多久执行一次检查点,默认300毫秒。

(6)检查点执行的时机

  • 超过slave_checkpoint_period配置。可参考next_event函数如下:

if (rli->is_parallel_exec() && (opt_mts_checkpoint_period != 0 || force))
{
ulonglong period= static_cast<ulonglong>(opt_mts_checkpoint_period * 1000000ULL);
...
(void) mts_checkpoint_routine(rli, period, force, true);
...
      }
  • 达到GAQ队列已满,如下:

 //如果达到了 GAQ的大小 设置为force 强制checkpoint 
bool force= (rli->checkpoint_seqno > (rli->checkpoint_group - 1));
  • 正常stop slave。

(7)一个列子

通常有压力的情况下的slave_worker_info中的所有工作线程最大的Checkpoint_master_log_pos应该和slave_relay_log_info中的Master_log_pos 相等,因为这是最后一个检查点的位点信息,如下:

如何实现从库MTS多线程并行回放

三、MTS中的检查点的流程

这一部分将详细描述一下检查点的步骤,关于检查点可以参考函数mts_checkpoint_routine。

假设现在有7个事务是可以并行执行的,工作线程数量为4个。当前协调线程已经分发了5个,前面4个事务都已经执行完成,其中第5的一个事务是大事务。那么可能当前的状态图如下(图20-1,高清原图包含在文末原图中):

如何实现从库MTS多线程并行回放

前面4个事务每个工作线程都分到一个,最后一个大事务这里假设由工作线程2进行执行,图中用红色部分表示。

(1)判断是超过了slave_checkpoint_period设置的大小,如果超过需要进行检查点。

  if (!force && diff < period)
//是否需要进行检查点是否超过了slave_checkpoint_period的设置
  {
    
    DBUG_RETURN(FALSE);
  }

(2)扫描GAQ队列进行出队操作,直到第一个没有提交的事务为止。图中红色部分就是一个大事务,检查点只能停留在它之前。

cnt= rli->gaq->move_queue_head(&rli->workers); 
//work数组 返回出队的个数

move_queue_head部分代码如下:

    if (ptr_g->worker_id == MTS_WORKER_UNDEF ||
        my_atomic_load32(&ptr_g->done) == 0) 
//当前GROUP是否已经执行完成 如果没有执行完成就需要 停止本次检查点
      break; 

(3)更新内存和relay_log_info_repository表的信息为本次检查点指向的位置。

先更新内存信息,也就是我们show slave status中看到的信息:

  rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
  rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
  rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);

然后强制写入表slave_relay_log_info中:

error= rli->flush_info(TRUE); 
//将本次检查点信息 写入到relay_log_info_repository表中

(4)更新last_master_timestamp信息为检查点位置事务的XID_EVENT的timstamp值

这个值在第27节中会详细描述,它是计算Seconds_behind_master的一个因素:


ts= rli->gaq->empty()? 0 : reinterpret_cast<Slave_job_group*>(rli->gaq->head_queue())->ts;
//rli->gaq->head_queue 检查点位置的GROUP的时间
rli->reset_notified_checkpoint(cnt, ts, need_data_lock, true);
reset_notified_checkpoint函数中有:
last_master_timestamp= new_ts;

因此MTS中Seconds_behind_master的计算和检查点息息相关。

(5)最后还会将前面GAQ出队的事务数量进行统计,因为每个工作线程需要根据这个值来进行Bitmap位图的偏移。并且还会维护我们前面说的GAQ的checkpoint_seqno值。

这个操作也是在函数Relay_log_info::reset_notified_checkpoint中完成的,实际上很简单部分代码如下:

for (Slave_worker **it= workers.begin(); it != workers.end(); ++it)
//循环每个woker
w->bitmap_shifted= w->bitmap_shifted + shift; 
//每个worker线程都会增加 这个偏移量
checkpoint_seqno= checkpoint_seqno - shift; 
//这里减去 移动的个数

到这里整个检查点的基本操作就完成了。我们看到实际上步骤并不多,拿到Bitmap偏移量后每个工作线程就会在随后的第一个事务提交的时候进行位图的偏移,checkpoint_seqno 计数也会更新。

我们前面的假设环境中,如果触发了一次检查点,并且协调线程将后两个可以并行的事务发给了工作线程1和3进行处理并且处理完成。那么我们的图会变成如下(图20-2,高清原图包含在文末原图中):

如何实现从库MTS多线程并行回放

这张图中我用不同样色表示了不同线条,因为它们交叉比较多。GAQ中的红色事务就是我们假设的大事务它仍然没有执行完成,它也是我们所谓的‘gap’。如果这个时候mysql实例异常重启,那么这个红色‘gap’就是我们启动后需要找到的事务,方式就是通过Bitmap位图进行比对,后面说异常恢复的时候再详细讨论。如果是开启了GTID,这种‘gap’很容易就能观察到,下一节将进行测试
同时我们需要注意这个时候工作线程2并没有分发新的事务执行,因为工作线程2没有执行完大事务, 因此在slave_woker_info表中它的信息仍然显示为上一次提交事务的信息。而工作线程4因为没有分配到新的事务,因此slave_woker_info表中它的信息也显示为上一次提交事务的信息。因此在slave_woker_info中工作线程2和工作线程4的检查点信息、Bitmap信息、checkpoint_seqno都是老的信息。

好了到这里我已经说明了MTS中三个关键点

  • 协调线程是根据什么规则进行事务分发的。

  • 工作线程如何拿到分发的事务。

  • MTS中的检查点是如何进行的。

看完上述内容,你们对如何实现从库MTS多线程并行回放有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注编程网数据库频道,感谢大家的支持。

您可能感兴趣的文档:

--结束END--

本文标题: 如何实现从库MTS多线程并行回放

本文链接: https://lsjlt.com/news/61541.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 如何实现从库MTS多线程并行回放
    今天就跟大家聊聊有关如何实现从库MTS多线程并行回放,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。从库MTS多线程并行回放重点描述一下MTS中检查点...
    99+
    2024-04-02
  • 从库 MTS 多线程并行回放(二)
    本节包含一个笔记,链接如下: https://www.jianshu.com/p/e920a6d33005 这一节会先描述 MTS 的工作线程执行 Event 的大概流程。然后重点描述一下 MTS 中检查点的概念。在后面的第 25 节我...
    99+
    2018-07-08
    从库 MTS 多线程并行回放(二)
  • 怎么进行从库MTS多线程并行回放
    今天就跟大家聊聊有关怎么进行从库MTS多线程并行回放,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、综述与单SQL线程的回放不同,MTS包含多个工...
    99+
    2024-04-02
  • 技术分享 | 从库 MTS 多线程并行回放(一)
    本节包含分发调用流程请参考链接: https://www.jianshu.com/p/8706d7422d89 一、综述 与单 SQL 线程的回放不同,MTS 包含多个工作线程,原有的 SQL 线程蜕变为协调线程。SQL 协调线程同时还承...
    99+
    2014-09-28
    技术分享 | 从库 MTS 多线程并行回放(一)
  • 技术分享 | 从库 MTS 多线程并行回放(二)
    本节包含一个笔记如下: https://www.jianshu.com/p/e920a6d33005 这一节会先描述 MTS 的工作线程执行 Event 的大概流程。然后重点描述一下 MTS 中检查点的概念。在后面的第 25 节我们可以看...
    99+
    2020-09-11
    技术分享 | 从库 MTS 多线程并行回放(二)
  • java多线程中如何实现线程并发库
    本篇文章给大家分享的是有关java多线程中如何实现线程并发库,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。多线程之线程并发库原子性操作类java.util.concurrent...
    99+
    2023-06-19
  • JavaScript如何实现多线程运行库Nexus.js
    这篇文章主要为大家展示了“JavaScript如何实现多线程运行库Nexus.js”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“JavaScript如何实现多线...
    99+
    2024-04-02
  • python如何实现多线程并发抓取
    这篇文章主要介绍了python如何实现多线程并发抓取,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。多线程并发抓取单线程太慢的话,就需要多线程了,这里给个简单的线程池模板 这个...
    99+
    2023-06-17
  • java多线程并发执行怎么实现
    在Java中实现多线程的并发执行有多种方式,以下是其中的几种常见方法:1. 继承Thread类:创建一个继承自Thread类的子类,...
    99+
    2023-09-27
    java
  • c语言多线程并行怎么实现
    在C语言中,可以使用pthread库来实现多线程并行。1. 首先,需要包含pthread.h头文件。```c#include ```...
    99+
    2023-09-14
    c语言
  • java怎么实现多线程并发执行
    Java实现多线程并发执行的方式有两种:继承Thread类和实现Runnable接口。 继承Thread类: 定义一个类,继承...
    99+
    2023-10-25
    java
  • python怎么实现多线程并得到返回值
    这篇“python怎么实现多线程并得到返回值”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“python怎么实现多线程并得到返...
    99+
    2023-06-30
  • java多线程并发执行如何操作
    在Java中,可以使用多种方式来实现多线程的并发执行,以下是一些常见的操作方式: 继承Thread类:创建一个继承自Thread...
    99+
    2023-10-23
    java
  • 如何在C#项目中使用Task实现并行和多线程编程
    这期内容当中小编将会给大家带来有关如何在C#项目中使用Task实现并行和多线程编程,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。任务和线程的区别:任务是架构在线程之上的,也就是说任务最终还是要抛给线程去执...
    99+
    2023-06-06
  • Python中threading库如何实现线程锁与释放锁
    小编给大家分享一下Python中threading库如何实现线程锁与释放锁,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!控制资源访问前文提到threading库在...
    99+
    2023-06-15
  • java如何实现多线程的顺序执行
    场景 编写一个程序,启动三个线程,三个线程的name分别是A,B,C;,每个线程将自己的ID值在屏幕上打印5遍,打印顺序是ABCABC... 使用 synchronized 实现 ...
    99+
    2024-04-02
  • java多线程如何实现
    java实现多线程的方法:(推荐:java视频教程)方式一:继承Thread类的方式创建一个继承于Thread类的子类重写Thread类中的run():将此线程要执行的操作声明在run()创建Thread的子类的对象调用此对象的start(...
    99+
    2022-02-13
    java
  • java如何实现多线程
    Java多线程是Java高级特性之一,通过多线程,我们可以实现多任务同时协同工作,在一定情况下提升程序效率,但是Java多线程仍要慎重使用。 (推荐学习:java课程)首先第一点,Java多线程需要较高的编码技巧,一...
    99+
    2019-06-27
    java教程 java
  • PHP 多线程如何实现?
    php 多线程是指在一个进程中同时运行多个任务,通过创建独立运行的线程实现。php 中可以使用 pthreads 扩展模拟多线程行为,安装后可使用 thread 类创建和启动线程。例如,...
    99+
    2024-05-06
    php 多线程
  • redis如何实现多线程
    redis 通过巧妙地结合 reactor 模式、线程池和内部多线程机制实现了多线程,从而有效利用多核 cpu,提高吞吐量、优化资源利用,保持低延迟并增强扩展性,满足不同负载需求。 R...
    99+
    2024-06-12
    redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作