返回顶部
首页 > 资讯 > 后端开发 > Python >RocketMQ设计之主从复制和读写分离
  • 368
分享到

RocketMQ设计之主从复制和读写分离

2024-04-02 19:04:59 368人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

目录一、主从复制二、读写分离一、主从复制 RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费

一、主从复制

RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费数据丢失。

RocketMQ采用Broker数据主从复制机制,当消息发送到Master服务器后会将消息同步到Slave服务器,如果Master服务器宕机,消息消费者还可以继续从Slave拉取消息。

消息从Master服务器复制到Slave服务器上,有两种复制方式:同步复制SYNC_MASTER和异步复制ASYNC_MASTER

通过配置文件conf/broker.conf文件配置:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional infORMation regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     Http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language Governing permissions and
#  limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

对brokerRole参数进行设置:

同步复制:Master和Slave都写成功后才返回客户端写成功的状态。

  • 优点:Master服务器出现故障,Slave服务器上有全部数据的备份,很容易恢复到Master服务器。
  • 缺点:由于多了一个同步等待的步骤,增加数据写入延迟,降低系统吞吐量。

异步复制:仅Master服务器写成功即可返回给客户端写成功的状态。

  • 优点:没有同步等待的步骤,低延迟,高吞吐。
  • 缺点:如果Master服务器出现故障,有些数据可能未写入Slave服务器,未同步的数据可能丢失

实际应用中,需要结合业务场景,合理设置刷盘方式和主从复制方式。不建议使用同步刷盘方式,因为它频繁触发写磁盘操作,性能下降很明显。**通常把MasterSlave设置为异步刷盘,同步复制,保证数据不丢失。**这样即使一台服务器出故障,仍然可以保证数据不丢失。

二、读写分离

读写分离机制是高性能、高可用架构中常见的设计,例如Mysql实现读写分离机制,Client只能从Master服务器写数据,可以从Master服务器和Slave服务器都读数据。

RocketMQ的Consumer在拉取消息时,Broker会判断Master服务器的消息堆积量来决定Consumer是否从Slave服务器拉取消息消费。默认一开始从Master服务器拉群消息,如果Master服务器的消息堆积超过物理内存40%,则会返回给Consumer的消息结果并告知Consumer,下次从其他Slave服务器上拉取消息。

RocketMQ 有属于自己的一套读写分离逻辑,会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。

Consumer在向 Broker 发送消息拉取请求时,会根据筛选出来的消息队列,判定是从Master,还是从Slave拉取消息,默认是Master。

Broker 接收到消息消费者拉取请求,在获取本地堆积的消息量后,会计算服务器的消息堆积量是否大于物理内存的一定值,如果是,则标记下次从 Slave服务器拉取,计算 Slave服务器的 Broker Id,并响应给消费者。

Consumer在接收到 Broker的响应后,会把消息队列与建议下一次拉取节点的 Broker Id 关联起来,并缓存在内存中,以便下次拉取消息时,确定从哪个节点发送请求。

public class GetMessageResult {

    private final List<SelectMappedBufferResult> messageMapedList =
        new ArrayList<SelectMappedBufferResult>(100);
    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
    private GetMessageStatus status;
    private long nextBeginOffset;
    private long minOffset;
    private long maxOffset;
    private int bufferTotalSize = 0;
    // 标识是否通过Slave拉拉取消息
    private boolean suggestPullingFromSlave = false;
    private int msGCount4Commercial = 0;
}

// 针对消息堆积量过大会切换到Slave进行查询。
// maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量。
// TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,acceSSMessageInMemoryMaxRatio 的默认值为 40,
// 以上逻辑即可算出当前消息堆积量是否大于物理内存的 40%,如果大于则将 suggestPullingFromSlave 设置为 true。

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  • 决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中。
  • suggestPullingFromSlave的默认值为 false,即默认消费者不会消费从服务器,但它会在消费者发送消息拉取请求时,动态改变该值,Broker 接收、处理消费者拉取消息请求。
  • 针对本MessageQueue消息堆积量过大会切换到Slave进行查询,maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,当前消息堆积量是否大于物理内存的 40%就会切换到Slave进行查询。
public class PullMessageResponseHeader implements CommandCustomHeader {
    // suggestWhichBrokerId标识从哪个broker进行查询
    private Long suggestWhichBrokerId;
    private Long nextBeginOffset;
    private Long minOffset;
    private Long maxOffset;
}


public class PullMessageProcessor implements NettyRequestProcessor {

    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        response.setOpaque(request.getOpaque());

        final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

        if (getMessageResult != null) {
            response.setRemark(getMessageResult.getStatus().name());
            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
            responseHeader.setMinOffset(getMessageResult.getMinOffset());
            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

            // 建议从slave消费消息
            if (getMessageResult.isSuggestPullingFromSlave()) {
                // 从slave查询
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                // 从master查询
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    // 针对SLAVE需要判断是否可读,不可读的情况下读MASTER
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }

            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                // consume too slow ,redirect to another Machine
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                // consume ok
                else {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
        }

        return response;
    }
}

PullMessageResponseHeadersuggestWhichBrokerId标识某个MessageQueue的消息从具体的brokerId进行查询。
针对Slave不可读的情况会设置为从MASTER_ID进行查询。

public class PullapiWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong> pullFromWhichnodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        // 处理MessageQueue对应拉取的brokerId
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

        // 省略相关代码

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
        // 保存在pullFromWhichNodeTable对象中
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }
}

Consumer收到拉取响应回来的数据后,会将下次建议拉取的 brokerId缓存起来。

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        // 查找MessageQueue应该从brokerName的哪个节点查询
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }


    public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }

        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }

        return MixAll.MASTER_ID;
    }
}

Consumer拉取消息的时候会从 pullFromWhichNodeTable 中取出拉取 brokerId确定去具体的broker进行查询。

到此这篇关于RocketMQ设计之主从复制和读写分离的文章就介绍到这了,更多相关RocketMQ从复制和读写分离内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: RocketMQ设计之主从复制和读写分离

本文链接: https://lsjlt.com/news/143238.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • RocketMQ设计之主从复制和读写分离
    目录一、主从复制二、读写分离一、主从复制 RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费...
    99+
    2024-04-02
  • MySQL 主从复制与读写分离
             在实际生产环境中,如果对数据库的读写都在同一块数据库服务器中操作,无论是在安全性、高可用性,还是高并发等各个...
    99+
    2024-04-02
  • 搭建mysql的主从复制和读写分离
    搭建mysql的主从复制和读写分离   +--------+           &...
    99+
    2024-04-02
  • MYSQL的主从复制与读写分离
    在实际生产环境中,如果对数据库的读和写都在同一个数据库服务器中操作,无论是安全性,高可用性,还是高并发性等各个方面都是不能满足实际需求,因此,一般来说都是通过主从复制的方式来同步诗句,再通过读写分离来提升数...
    99+
    2024-04-02
  • MySQL(4)-AB主从复制与读写分离
        本篇博客介绍的是通过 主从复制(Master-Slave)的方式来同步数据,再通过读写分离(MySQL-Proxy)来提升数据库的并发负载能力实现mysql高可用性    首先 , 我们先了解AB主...
    99+
    2024-04-02
  • 数据库---mysql主从复制读写分离
    http://m.open-open.com/m/lib/view/1413274853450.html 原理及架构分析部署前准备下载好源码包存放位置要与脚本中对应  mysql-5.5...
    99+
    2024-04-02
  • 详解MySQL主从复制及读写分离
    目录前言一、相关概述二、读写分离三、MySQL主从复制实验部署四、MySQL读写分离实验前言 在企业实际应用中,成熟的业务通常数据量都比较大,而单台MySQL服务器在安全性、高可用性...
    99+
    2024-04-02
  • 利用Amoeba实现MySQL主从复制和读写分离
    在实际生产环境中,如果对数据库的读和写都在同一个数据库服务器中操作,无论是在安全性、高可用性,还是高并发等各个方面都是完全不能满足实际需求的,因此,一般来说都是通过主从复制(Master-Slave)的方式...
    99+
    2024-04-02
  • MySQL主从复制与读写分离的原理
    这篇文章主要为大家分享MySQL主从复制与读写分离的原理。文中还介绍了如何配置和验证主从复制与读写分离的实验,希望大家通过这篇文章能有所收获。MySQL主从复制与读写分离一、前言前面我们已经对MySQL数据...
    99+
    2024-04-02
  • MySQL数据库主从复制与读写分离
    目录一.主从复制主从复制三线程主从复制的过程:主从复制的策略:主从复制高延迟二.读写分离读写分离概念读写分离原因与场景总结一.主从复制        主从复制:在实际的生产中,为了解...
    99+
    2024-04-02
  • CentOS7上部署Mysql主从复制与读写分离
    在实际生产环境中,如果对数据库的读和写都在同一个数据库服务器中操作,无论是在安全性,高可用性,还是高并发等各方面都是完全不能满足实际需求的,因此一般来说都是通过主从复制(Master-Slave)的方式来同...
    99+
    2024-04-02
  • mysql主从复制及读写分离(附安装包)
    实验目的:在实际生产环境中,如果对数据库的读和写都在同一个数据库服务器中操作,无论在安全性、高可用性,还是高并发等各个方面都是完全不能满足实际需求的,因此,一般需要通过主从复制(master-slave)的...
    99+
    2024-04-02
  • amoeba实现mysql读写分离+主从复制架构
    一、环境系统:centos6.5mysql版本:mysql5.6master服务器:192.168.1.21slave服务器: 192.168.1.100master写 slave读二、实现mysql主从复...
    99+
    2024-04-02
  • Mysql主从复制与读写分离图文详解
    文章思维导图 为什么使用主从复制、读写分离 主从复制、读写分离一般是一起使用的。目的很简单,就是为了提高数据库的并发性能。 你想,假设是单机,读写都在一台MySQL上面完成,性...
    99+
    2024-04-02
  • 部署MongoDB复制集(主从复制、读写分离、高可用)
    MongoDB 复制集 复制集(Replica Sets)是额外的数据副本,是跨多个服务器同步数据的过程,复制集提供了冗余备份并提高了数据的可用性,通过复制集可以对硬件故障和中断的服务进行恢复。 Mon...
    99+
    2024-04-02
  • mysql主从复制读写分离与高可用配置
    一、说明 前面我们说了mysql的安装配置(并提供一键安装脚本),mysql语句使用以及备份恢复mysql数据;本次要介绍的是mysql的主从复制,读写分离;及高可用MHA;环境如下:master:Cent...
    99+
    2024-04-02
  • proxysql 主从复制读写分离配置过程记录
    1、环境信息 软件GitHub地址: https://github.com/sysown/proxysql/ 软件官网:https://proxysql.com/ 系统版本: [root@12c proxy...
    99+
    2024-04-02
  • MySQL中如何实现主从复制与读写分离
    MySQL中如何实现主从复制与读写分离,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。1,mysql的配置CentOS 5.x,6.0编译安装N...
    99+
    2024-04-02
  • MySQL中怎么实现主从复制及读写分离
    MySQL中怎么实现主从复制及读写分离?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、相关概述主从复制:主数据库(Master)发送更新事件到从数据库(Slave),从数...
    99+
    2023-06-14
  • SpringBoot环境-MySQL主从复制,读写分离的实现
    目录 概述 环境 主从复制 读写分离 概述  记录在MySQL数据库中主从复制以及SpringBoot环境操作MySQL数据库读写分离的实现步骤。  背景 :因为我们在对数据库进行操作时,如果读写操作都由一台数据库承担的话压...
    99+
    2023-09-06
    mysql 数据库 java linux
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作