返回顶部
首页 > 资讯 > 精选 >使用Redisson订阅数问题怎么解决
  • 346
分享到

使用Redisson订阅数问题怎么解决

2023-06-26 06:06:51 346人浏览 薄情痞子
摘要

本文小编为大家详细介绍“使用Redisson订阅数问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“使用Redisson订阅数问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、前提最近在使用

本文小编为大家详细介绍“使用Redisson订阅数问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“使用Redisson订阅数问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

    一、前提

    最近在使用分布式redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。

    二、源码分析

    下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

    1、RedissonLock#lock() 方法

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {        long threadId = Thread.currentThread().getId();        // 尝试获取,如果ttl == null,则表示获取锁成功        Long ttl = tryAcquire(leaseTime, unit, threadId);        // lock acquired        if (ttl == null) {            return;        }        // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题        RFuture<RedissonLockEntry> future = subscribe(threadId);        if (interruptibly) {            commandExecutor.syncSubscriptionInterrupted(future);        } else {            commandExecutor.syncSubscription(future);        }        // 后面代码忽略        try {            // 无限循环获取锁,直到获取锁成功            // ...        } finally {            // 取消订阅锁释放事件            unsubscribe(future, threadId);        }}

    总结下主要逻辑:

    • 获取当前线程的线程id;

    • tryAquire尝试获取锁,并返回ttl

    • 如果ttl为空,则结束流程;否则进入后续逻辑;

    • this.subscribe(threadId)订阅当前线程,返回一个RFuture;

    • 如果在指定时间没有监听到,则会产生如上异常。

    • 订阅成功后, 通过while(true)循环,一直尝试获取锁

    • fially代码块,会解除订阅

    所以上述这情况问题应该出现在subscribe()方法中

    2、详细看下subscribe()方法

    protected RFuture<RedissonLockEntry> subscribe(long threadId) {    // entryName 格式:“id:name”;    // channelName 格式:“redisson_lock__channel:name”;    return pubSub.subscribe(getEntryName(), getChannelName());}

    RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {    // ....    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}

    而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的

    public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {    this(config, id);    this.config = cfg;    // 初始化    initTimer(cfg);    initSingleEntry();}protected void initTimer(MasterSlaveServersConfig config) {    int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};    Arrays.sort(timeouts);    int minTimeout = timeouts[0];    if (minTimeout % 100 != 0) {        minTimeout = (minTimeout % 100) / 2;    } else if (minTimeout == 100) {        minTimeout = 50;    } else {        minTimeout = 100;    }    timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);    connectionWatcher = new IdleConnectionWatcher(this, config);    // 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:    subscribeService = new PublishSubscribeService(this, config);}

    PublishSubscribeService构造函数

    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {    super();    this.connectionManager = connectionManager;    this.config = config;    for (int i = 0; i < locks.length; i++) {        // 这里初始化了一组信号量,每个信号量的初始值为1        locks[i] = new AsyncSemaphore(1);    }}

    3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面

    private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();public RFuture<E> subscribe(String entryName, String channelName) {      // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量     // public AsyncSemaphore getSemaphore(ChannelName channelName) {    //    return locks[Math.abs(channelName.hashCode() % locks.length)];    // }    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));    AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();        RPromise<E> newPromise = new RedissonPromise<E>() {        @Override        public boolean cancel(boolean mayInterruptIfRunning) {            return semaphore.remove(listenerHolder.get());        }    };    Runnable listener = new Runnable() {        @Override        public void run() {            //  如果存在RedissonLockEntry, 则直接利用已有的监听            E entry = entries.get(entryName);            if (entry != null) {                entry.acquire();                semaphore.release();                entry.getPromise().onComplete(new TransferListener<E>(newPromise));                return;            }            E value = createEntry(newPromise);            value.acquire();            E oldValue = entries.putIfAbsent(entryName, value);            if (oldValue != null) {                oldValue.acquire();                semaphore.release();                oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));                return;            }            // 创建监听,            RedisPubSubListener<Object> listener = createListener(channelName, value);            // 订阅监听            service.subscribe(LonGCodec.INSTANCE, channelName, semaphore, listener);        }    };    // 最终会执行listener.run方法    semaphore.acquire(listener);    listenerHolder.set(listener);    return newPromise;}

    AsyncSemaphore#acquire()方法

    public void acquire(Runnable listener) {    acquire(listener, 1);}public void acquire(Runnable listener, int permits) {    boolean run = false;    synchronized (this) {        // counter初始化值为1        if (counter < permits) {            // 如果不是第一次执行,则将listener加入到listeners集合中            listeners.add(new Entry(listener, permits));            return;        } else {            counter -= permits;            run = true;        }    }    // 第一次执行acquire, 才会执行listener.run()方法    if (run) {        listener.run();    }}

    梳理上述逻辑:

    从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
    2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
    3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
    4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。

    从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法

    4、PublishSubscribeService#subscribe逻辑如下:

    private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {    RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();    // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。    subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);    return promise;}private void subscribe(Codec codec, ChannelName channelName,  RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {    PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);    if (connEntry != null) {        // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中        addListeners(channelName, promise, type, lock, connEntry, listeners);        return;    }    // 没有时,才是最重要的逻辑    freePubSubLock.acquire(new Runnable() {        @Override        public void run() {            if (promise.isDone()) {                lock.release();                freePubSubLock.release();                return;            }            // 从队列中取头部元素            PubSubConnectionEntry freeEntry = freePubSubConnections.peek();            if (freeEntry == null) {                // 第一次肯定是没有的需要建立                connect(codec, channelName, promise, type, lock, listeners);                return;            }            // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。            int remainFreeAmount = freeEntry.tryAcquire();            if (remainFreeAmount == -1) {                throw new IllegalStateException();            }            PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);            if (oldEntry != null) {                freeEntry.release();                freePubSubLock.release();                addListeners(channelName, promise, type, lock, oldEntry, listeners);                return;            }            // 如果remainFreeAmount=0, 则从队列中移除            if (remainFreeAmount == 0) {                freePubSubConnections.poll();            }            freePubSubLock.release();            // 增加监听            RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);            ChannelFuture future;            if (PubSubType.PSUBSCRIBE == type) {                future = freeEntry.psubscribe(codec, channelName);            } else {                future = freeEntry.subscribe(codec, channelName);            }            future.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    if (!future.isSuccess()) {                        if (!promise.isDone()) {                            subscribeFuture.cancel(false);                        }                        return;                    }                    connectionManager.newTimeout(new TimerTask() {                        @Override                        public void run(Timeout timeout) throws Exception {                            subscribeFuture.cancel(false);                        }                    }, config.getTimeout(), TimeUnit.MILLISECONDS);                }            });        }    });}private void connect(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {    // 根据channelName计算出slot获取PubSubConnection    int slot = connectionManager.calcSlot(channelName.getName());    RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);    promise.onComplete((res, e) -> {        if (e != null) {            ((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e);        }    });    connFuture.onComplete((conn, e) -> {        if (e != null) {            freePubSubLock.release();            lock.release();            promise.tryFailure(e);            return;        }        // 这里会从配置中读取subscriptionsPerConnection        PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());        // 每获取一次,subscriptionsPerConnection就会减直到为0        int remainFreeAmount = entry.tryAcquire();        // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中        PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);        if (oldEntry != null) {            releaseSubscribeConnection(slot, entry);            freePubSubLock.release();            addListeners(channelName, promise, type, lock, oldEntry, listeners);            return;        }        if (remainFreeAmount > 0) {            // 加入到队列中            freePubSubConnections.add(entry);        }        freePubSubLock.release();        RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);        // 这里真正的进行订阅(底层与redis交互)        ChannelFuture future;        if (PubSubType.PSUBSCRIBE == type) {            future = entry.psubscribe(codec, channelName);        } else {            future = entry.subscribe(codec, channelName);        }        future.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    if (!promise.isDone()) {                        subscribeFuture.cancel(false);                    }                    return;                }                connectionManager.newTimeout(new TimerTask() {                    @Override                    public void run(Timeout timeout) throws Exception {                        subscribeFuture.cancel(false);                    }                }, config.getTimeout(), TimeUnit.MILLISECONDS);            }        });    });}

    PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:

     public int tryAcquire() {    while (true) {        int value = subscribedChannelsAmount.get();        if (value == 0) {            return -1;        }        if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {            return value - 1;        }    }}

    梳理上述逻辑:

    还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
    2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法

    1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
    2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
    2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
    2.4 后面就是进行底层的subscribe和addListener

    如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
    4、如果remainFreeAmount < 0 会抛出IllegalStateException异常;如果remainFreeAmount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
    5、最后也是进行底层的subscribe和addListener;

    读到这里,这篇“使用Redisson订阅数问题怎么解决”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网精选频道。

    --结束END--

    本文标题: 使用Redisson订阅数问题怎么解决

    本文链接: https://lsjlt.com/news/307086.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    猜你喜欢
    • 使用Redisson订阅数问题怎么解决
      本文小编为大家详细介绍“使用Redisson订阅数问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“使用Redisson订阅数问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、前提最近在使用...
      99+
      2023-06-26
    • 关于使用Redisson订阅数问题
      目录一、前提二、源码分析1、RedissonLock#lock() 方法2、详细看下subscribe()方法3、回到subscribe()方法主要逻辑还是交给了 LockPubSu...
      99+
      2024-04-02
    • Spring Cache怎么使用Redisson分布式锁解决缓存击穿问题
      本篇内容主要讲解“Spring Cache怎么使用Redisson分布式锁解决缓存击穿问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spring Cache怎么使用Red...
      99+
      2023-06-30
    • 详解SpringCache使用Redisson分布式锁解决缓存击穿问题
      目录1 什么是缓存击穿2 为什么要使用分布式锁3 什么是Redisson4 Spring Boot集成Redisson4.1 添加maven依赖4.2 配置yml4.3 配置Redi...
      99+
      2024-04-02
    • SAP采购订单报错问题怎么解决
      今天小编给大家分享一下SAP采购订单报错问题怎么解决的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。问题:SAP MM 明明已...
      99+
      2023-06-05
    • java怎么解决订单状态扭转问题
      这篇文章主要讲解了“java怎么解决订单状态扭转问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java怎么解决订单状态扭转问题”吧!状态机机制状态机机制是一种常用的解决状态扭转问题的方法...
      99+
      2023-07-05
    • Django使用问题怎么解决
      本篇内容介绍了“Django使用问题怎么解决”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Django R...
      99+
      2024-04-02
    • 怎么使用github解决问题
      在当今软件开发行业中,Github已经成为了解决问题的一个重要工具。Github是一个面向开源及私有软件项目的托管平台,因为其丰富的特性得到了全球开发者的广泛喜爱。在利用Github解决问题时,需要注意以下几个方面。一、Github是什么G...
      99+
      2023-10-22
    • Vue使用swiper问题怎么解决
      本文小编为大家详细介绍“Vue使用swiper问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“Vue使用swiper问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、下载指定版本swipe...
      99+
      2023-07-06
    • 怎么使用Puppeteer解决SEO问题
      这篇文章主要讲解了“怎么使用Puppeteer解决SEO问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么使用Puppeteer解决SEO问题”吧!引言在前端开发中,我们经常会遇到SEO...
      99+
      2023-07-05
    • PVE 虚拟机主机 更换国内源以及解决无效订阅的问题
      1、PVE更换国内源  注:本文以 pve 7.3.3 为例。 替换前建议先更新下证书,否则可能由于证书不可用导致 https 无法使用,进而无法下载所有软件。 apt install apt-transport-https ca-c...
      99+
      2023-08-31
      debian linux 服务器
    • Redis中的发布订阅和事务怎么使用
      本篇内容主要讲解“Redis中的发布订阅和事务怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Redis中的发布订阅和事务怎么使用”吧!发布订阅redis的发布订阅系统有点类似于我们生活中...
      99+
      2023-06-19
    • 亚马逊订阅服务器配置错误怎么解决
      1. 检查订阅服务器配置 首先,您需要检查您的订阅服务器配置是否正确。您可以通过以下步骤检查: 登录到您的 AWS 控制台。 转到 SNS 服务。 选择您的订阅服务器。 确保您的订阅服务器的端点 URL 是正确的。 确保您的订阅服务器的...
      99+
      2023-10-27
      亚马逊 服务器配置 错误
    • Java redisson读取不了数据怎么解决
      如果 Java 的 Redisson 无法读取数据,可能有以下几个问题和解决方法: 检查 Redis 服务是否正常运行:首先确保...
      99+
      2024-04-09
      java
    • 使用Java怎么解决跨域问题
      今天就跟大家聊聊有关使用Java怎么解决跨域问题,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。什么是跨域(CORS)跨域(CORS)是指不同域名之间相互访问。跨域,指的是浏览器不能执...
      99+
      2023-06-06
    • hibernate的orphanRemoval使用问题怎么解决
      在使用 Hibernate 的 orphanRemoval 属性时,可能会遇到一些问题。下面是一些常见问题的解决方法:1. 单向关联...
      99+
      2023-09-12
      hibernate
    • React中useEffect使用问题怎么解决
      本篇内容介绍了“React中useEffect使用问题怎么解决”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!前言最近看了一下 ant-des...
      99+
      2023-07-02
    • Elasticsearch使用常见问题怎么解决
      这篇文章主要介绍“Elasticsearch使用常见问题怎么解决”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Elasticsearch使用常见问题怎么解决”文章能帮助大家解决问题。一、和redis...
      99+
      2023-06-05
    • 使用jBuilder8出现问题怎么解决
      如果您在使用jBuilder8时遇到问题,可以尝试以下解决方案: 确保您的操作系统和Java版本与jBuilder8的要求兼容。...
      99+
      2023-10-28
      jBuilder8
    • thinkphp的事件绑定、监听和订阅怎么使用
      这篇文章主要介绍了thinkphp的事件绑定、监听和订阅怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇thinkphp的事件绑定、监听和订阅怎么使用文章都会有所收获,下面我们一起来看看吧。事件是什么事件...
      99+
      2023-06-30
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作