返回顶部
首页 > 资讯 > 后端开发 > Python >JavaAQS信号量Semaphore的使用
  • 397
分享到

JavaAQS信号量Semaphore的使用

Java信号量SemaphoreJavaSemaphoreJava信号量 2023-02-02 12:02:04 397人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录一.什么是Semaphore二.Semaphore的使用三.Semaphore源码分析一.什么是Semaphore Semaphore,俗称信号量,它是操作系统中PV操作的原语在

一.什么是Semaphore

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。

Semaphore的功能非常强大,大小为1的信号量就类似于互斥,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。

二.Semaphore的使用

构造器

  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
   }

permits 表示许可证的数量(资源数)

fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

常用方法

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

应用场景

可以用于做流量控制,特别是公用资源有限的应用场景

限流


    private static Semaphore semaphore = new Semaphore(5);
    
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
    public static void exec() {
        try {
            semaphore.acquire(1);
            // 模拟真实方法执行
            System.out.println("执行exec方法" );
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        {
            for (; ; ) {
                Thread.sleep(100);
                // 模拟请求以10个/s的速度
                executor.execute(() -> exec());
            }
        }
    }

三.Semaphore源码分析

主要关注 Semaphore的加锁解锁(共享锁)逻辑实现,线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现

	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //尝试获取共享锁,大于等于0则直接获取锁成功,小于0时则共享锁阻塞
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared的实现

   final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                // 当减一之后的值小于0 或者 
                // compareAndSetState成功,把state变为remaining,即将状态减一
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

入队阻塞

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入队,创建节点 使用共享模式
        final node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            //获取当前节点的前躯节点
                final Node p = node.predecessor();
                //如果节点为head节点
                if (p == head) {
                	//阻塞动作比较重,通常会再尝试获取资源,没有获取到返回负数
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //判断是否可以阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

入队操作

private Node addWaiter(Node mode) {
		//构建节点,模式是共享模式
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
        	//设置前一节点为tail 
            node.prev = pred;
            //设置当前节点为尾节点
            if (compareAndSetTail(pred, node)) {
            // 前一节点的next为当前节点
                pred.next = node;
                return node;
            }
        }
        //创建队列
        enq(node);
        return node;
    }

创建队列,经典的for循环创建双向链表

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
            	//节点为空 则new一个节点 设置头节点
                if (compareAndSetHead(new Node()))
                //把这个节点的头节点赋值给尾节点
                    tail = head;
            } else {
            // 如果尾节点存在 就将该节点的前节点指向tail
                node.prev = t;
                //设置当前节点为tail
                if (compareAndSetTail(t, node)) {
                //前一个节点的next指向当前节点,入队操作就完成了
                    t.next = node;
                    return t;
                }
            }
        }
    }

设置waitStatus状态及获取waitStatus状态,waitStatus为-1时可以唤醒后续节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            
            return true;
        if (ws > 0) {
            
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

阻塞 调用LockSupport.park

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

释放锁的逻辑

	public void release() {
        sync.releaseShared(1);
    }
	public final boolean releaseShared(int arg) {
		//cas成功则进行释放共享锁
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

state状态+1操作,cas成功,返回true

		protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //头节点不为空并且不是尾节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //waitstatus为-1
                if (ws == Node.SIGNAL) {
                //将SIGNAL置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;         
                    //唤醒   
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)                  
                break;
        }
    }

唤醒操作

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //ws小于0就将其设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //当前节点的下一个节点为空或者ws大于0
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //s不为空 则进行唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

唤醒下一个线程之后,要把上一个节点移除队列

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //下一个线程进来,如果前置节点是头节点,则将前置节点出队
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //cas获取资源成功
                    if (r >= 0) {
                    	//出队操作
                        setHeadAndPropagate(node, r);
                        //将p.next移除
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

出队操作

private void setHeadAndPropagate(Node node, int propagate) {
		//设置当前节点为head节点,前一节点的head属性被删除
        Node h = head; 
        setHead(node);
        //如果是传播属性
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
           //并且是共享模式,可以持续唤醒下一个,
           //只要资源数充足 就可以一直往下唤醒,提高并发量
            Node s = node.next;
            if (s == null || s.isshared())
                doReleaseShared();
        }
    }
 private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

至此,线程的阻塞唤醒核心逻辑就这么多,共享锁与独占锁的区别是可以唤醒后续的线程,如果资源数充足的话,可以一直往下唤醒,提高了并发量。

到此这篇关于Java AQS信号量Semaphore的使用的文章就介绍到这了,更多相关Java信号量Semaphore内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: JavaAQS信号量Semaphore的使用

本文链接: https://lsjlt.com/news/193941.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • JavaAQS信号量Semaphore的使用
    目录一.什么是Semaphore二.Semaphore的使用三.Semaphore源码分析一.什么是Semaphore Semaphore,俗称信号量,它是操作系统中PV操作的原语在...
    99+
    2023-02-02
    Java信号量Semaphore Java Semaphore Java信号量
  • 什么是信号量Semaphore
    这篇文章将为大家详细讲解有关什么是信号量Semaphore,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。本质就是 信号量模型,模型图如下:其中的 计数器 和...
    99+
    2024-04-02
  • 详解Java信号量Semaphore的原理及使用
    目录1.Semaphore的概述2.Semaphore的原理2.1 基本结构2.2 可中断获取信号量2.3 不可中断获取信号量2.4 超时可中断获取信号量2.5 尝试获取信号量2.6...
    99+
    2024-04-02
  • 一文读懂go中semaphore(信号量)源码
    目录运行时信号量机制 semaphore前言作用是什么几个主要的方法如何实现acquireSudogreleaseSudogsemaphorepoll_runtime_Sema...
    99+
    2022-06-07
    semaphore GO 信号量 源码
  • 分析Java并发编程之信号量Semaphore
    目录一、认识Semaphore1.1、Semaphore 的使用场景1.2、Semaphore 使用1.3、Semaphore 信号量的模型二、Semaphore 深入理解2.1、S...
    99+
    2024-04-02
  • 怎么看待Linux 多线程中的信号量Semaphore
    今天就跟大家聊聊有关怎么看待Linux 多线程中的信号量Semaphore,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。理解 Semaphore,从一个好的翻译开始Semaphore...
    99+
    2023-06-15
  • AQS同步组件Semaphore信号量案例剖析
    目录基本概念作用和使用场景源码分析构造函数常用方法使用案例acquire()获取单个许可acquire(int permits)获取多个许可tryAcquire()获取许可tryAc...
    99+
    2022-11-13
    AQS同步组件Semaphore AQS Semaphore信号量
  • Java多线程如何使用Semaphore实现信号灯
    这篇文章主要讲解了“Java多线程如何使用Semaphore实现信号灯”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程如何使用Semaphore实现信号灯”吧!前言:Semaph...
    99+
    2023-06-25
  • Java并发编程之Semaphore(信号量)详解及实例
    Java并发编程之Semaphore(信号量)详解及实例概述通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同...
    99+
    2023-05-31
    java 发编程 semaphore
  • Java并发编程中Semaphore计数信号量的示例分析
    这篇文章主要为大家展示了“Java并发编程中Semaphore计数信号量的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Java并发编程中Semaphore计数信号量的示例分析”这篇文章...
    99+
    2023-05-31
    java semaphore
  • JavaAQS中闭锁CountDownLatch的使用
    目录一. 简介二. 使用三. 应用场景四. 底层原理五. CountDownLatch与Thread.join的区别一. 简介 CountDownLatch(闭锁)是一个同步协助类,...
    99+
    2023-02-02
    Java CountDownLatch Java 闭锁CountDownLatch Java 闭锁
  • JavaAQS中ReentrantReadWriteLock读写锁的使用
    目录一. 简介二. 接口及实现类三.使用四. 应用场景五. 锁降级六.源码解析七.总结一. 简介 为什么会使用读写锁? 日常大多数见到的对共享资源有读和写的操作,写操作并没有读操作那...
    99+
    2023-02-02
    Java 读写锁ReentrantReadWriteLock Java 读写锁 Java ReentrantReadWriteLock
  • swoole中信号量怎么使用
    这篇文章主要介绍了swoole中信号量怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇swoole中信号量怎么使用文章都会有所收获,下面我们一起来看看吧。在swoole中,信号量主要用来保护共享资源,使得...
    99+
    2023-06-29
  • JavaAQS中CyclicBarrier回环栅栏的使用
    目录一. 简介二. CyclicBarrier的使用三. CyclicBarrier的应用场景四. CyclicBarrier与CountDownLatch的区别五. CyclicB...
    99+
    2023-02-02
    Java AQS中CyclicBarrier Java CyclicBarrier Java回环栅栏
  • Java中的Semaphore如何使用
    目录简介简述实现原理方法介绍案例分析适用场景简介 semaphore中文意思既是信号量,它的主要功能就是用来控制某个资源同时被访问的线程数。 为了控制某块资源的并发访问量时,可以使用...
    99+
    2024-04-02
  • Redisson分布式信号量RSemaphore如何使用
    本文小编为大家详细介绍“Redisson分布式信号量RSemaphore如何使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Redisson分布式信号量RSemaphore如何使用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一...
    99+
    2023-07-05
  • Redisson分布式信号量RSemaphore的使用超详细讲解
    目录一、RSemaphore的使用二、RSemaphore设置许可数量三、RSemaphore的加锁流程四、RSemaphore的解锁流程本篇文章基于redisson-3.17.6版...
    99+
    2023-02-11
    Redisson分布式信号量RSemaphore Redisson分布式信号量 Redisson RSemaphore
  • python中的信号通信 blinker的使用小结
    目录信号:官方介绍:blinker 使用命名信号匿名信号组播信号接收方订阅主题装饰器用法可订阅主题的装饰器检查信号是否有接收者检查订阅者是否订阅了某个信号基于blinker的Flas...
    99+
    2024-04-02
  • Java中信号量模型的实际应用
    本篇内容介绍了“Java中信号量模型的实际应用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Java信号量模型的工作方式如下:线程在运行的过...
    99+
    2023-06-17
  • FreeRTOS信号量API函数怎么用
    这篇文章主要介绍“FreeRTOS信号量API函数怎么用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“FreeRTOS信号量API函数怎么用”文章能帮助大家解决问题。    &n...
    99+
    2023-06-29
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作