返回顶部
首页 > 资讯 > 后端开发 > Python >详解Java七大阻塞队列之SynchronousQueue
  • 186
分享到

详解Java七大阻塞队列之SynchronousQueue

2024-04-02 19:04:59 186人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录分析其实SynchronousQueue 是一个特别有意思的阻塞队列,就我个人理解来说,它很重要的特点就是没有容量。 直接看一个例子: package dongguabai.

其实SynchronousQueue 是一个特别有意思的阻塞队列,就我个人理解来说,它很重要的特点就是没有容量。

直接看一个例子:


package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;


public class TestSynchronousQueue {

    public static void main(String[] args) {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        boolean add = synchronousQueue.add("1");
        System.out.println(add);
    }
}

代码很简单,就是往 SynchronousQueue 里放了一个元素,程序却抛异常了:


Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at dongguabai.test.juc.test.TestSynchronousQueue.main(TestSynchronousQueue.java:14)

而异常原因是队列满了。刚刚使用的是 SynchronousQueue#add 方法,现在来看看 SynchronousQueue#put 方法:


    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        synchronousQueue.put("1");
        System.out.println("----");
    }

看到 InterruptedException 其实就能猜出这个方法肯定会阻塞当前线程

通过这两个例子,也就解释了 SynchronousQueue 队列是没有容量的,也就是说在往 SynchronousQueue 中添加元素之前,得先向 SynchronousQueue 中取出元素,这句话听着很别扭,那可以换个角度猜想其实现原理,调用取出方法的时候设置了一个“已经有线程在等待取出”的标识,线程等待,然后添加元素的时候,先看这个标识,如果有线程在等待取出,则添加成功,反之则抛出异常或者阻塞。

分析

接下来从 SynchronousQueue#put 方法开始进行分析:


    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

可以发现是调用的 Transferer#transfer 方法,这个 Transferer 是在构造 SynchronousQueue 的时候初始化的:


    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

SynchronousQueue 有两种模式,公平与非公平,默认是非公平,非公平使用的就是 TransferStack,是基于单向链表做的:


 static final class Snode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
   ...
 }

那么重点就是 SynchronousQueue.TransferStack#transfer 方法了,从方法名都可以看出这是用来做数据交换的,但是这个方法有好几十行,里面各种 Node 指针搞来搞去,这个地方我觉得没必要过于纠结细节,老规矩,抓大放小,而且队列这种,很方便进行 Debug 调试。

再理一下思路:

  • 今天研究的是阻塞队列,关注阻塞的话,更应该关系的是 takeput 方法;
  • Transferer 是一个抽象类,只有一个 transfer 方法,即 takeput 共用,那就肯定是基于入参进行功能的区分;
  • takeput 方法底层都调用的 SynchronousQueue.TransferStack#transfer 方法;

将上面 SynchronousQueue#put 使用的例子修改一下,再加一个线程take


package dongguabai.test.juc.test;

import java.util.Date;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;


public class TestSynchronousQueue {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()->{
            System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-put了数据:"+"1");

            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        System.out.println("----");
        new Thread(()->{
            Object take = null;
            try {
                take = synchronousQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-take到了数据:"+take);
        }).start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("结束...");
    }
}

整个程序结束,并且输出:

----
2021-9-2 0:58:55::Thread-0-put了数据:1
2021-9-2 0:58:55::Thread-1-take到了数据:1
结束...

也就是说当一个线程在 put 的时候,如果有线程 take ,那么 put 线程可以正常运行,不会被阻塞。

基于这个例子,再结合上文的猜想,也就是说核心点就是找到 put 的时候现在已经有线程在 take 的标识,或者 take 的时候已经有线程在 put,这个标识不一定是变量,结合 AQS 的原理来看,很可能是根据链表中的 Node 进行判断。

接下来看 SynchronousQueue.put 方法:


    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

它底层也是调用的 SynchronousQueue.TransferStack#transfer 方法,但是传入参数是当前 put 的元素、false 和 0。再回过头看 SynchronousQueue.TransferStack#transfer 方法:


E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
  					//这里的参数e就是要put的元素,显然不为null,也就是说是DATA模式,根据注释,DATA模式就说明当前线程是producer
            int mode = (e == null) ? REQUEST : DATA;  

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        //因为第一次put那么h肯定为null,这里入参timed为false,所以会到这里,执行awaitFulfill方法,根据名称可以猜想出是一个阻塞方法
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                   ....
        }

这里首先会构造一个 SNode,然后执行 casHead 函数,其实最终栈结构就是:

head->put_e

就是 head 会指向 put 的元素对应的 SNode

然后会执行 awaitFulfill 方法:


SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;    //自旋机制
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this); //阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

最终还是会使用 LockSupport 进行阻塞,等待唤醒。

已经大致过了一遍流程了,细节方面就不再纠结了,那么假如再put 一个元素呢,其实结合源码已经可以分析出此时栈的结果为:

head-->put_e_1-->put_e

避免分析出错,写个 Debug 的代码验证一下:


package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;


public class DebugPut2E {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()-> {
            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()-> {
            try {
                synchronousQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

SynchronousQueue.TransferStack#awaitFulfill 方法的 LockSupport.park(this); 处打上断点,运行上面的代码,再看看现在的 head

在这里插入图片描述

的确与分析的一致。

也就是先进后出。再看 take 方法:


    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

调用的 SynchronousQueue.TransferStack#transfer 方法,但是传入参数是 nullfalse 和 0。

偷个懒就不分析源码了,直接 Debug 走一遍,代码如下:


package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;


public class DebugTake {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()-> {
            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-put-1").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()-> {
            try {
                synchronousQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-put-2").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()->{
            try {
                Object take = synchronousQueue.take();
                System.out.println("======take:"+take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-Take").start();
    }
}

SynchronousQueue#take 方法中打上断点,运行上面的代码:

在这里插入图片描述

这里的 s 就是 headm 就是栈顶的元素,也是最近一次 put 的元素。说白了 take 就是取的栈顶的元素,最后再匹配一下,符合条件就直接取出来。take 之后 head 为:

在这里插入图片描述

栈的结构为:

head-->put_e

最后再把整个流程梳理一遍:

执行 put 操作的时候,每次压入栈顶;take 的时候就取栈顶的元素,即先进后出;这也就实现了非公平;

至于公平模式,结合 TransferStack 的实现,可以猜测实现就是 put 的时候放入队列,take 的时候从队列头部开始取,先进先出。

那么这个队列设计的优势使用场景在哪里呢?个人感觉它的优势就是完全不会产生对队列中数据的争抢,因为说白了队列是空的,从某种程度上来说消费速率是很快的。

至于使用场景,我这边的确没有想到比较好的使用场景。结合组内同学的使用来看,他选择使用这个队列的原因是因为它不会在内存中生成任务队列,当服务宕机后不用担心内存中任务的丢失(非优雅停机的情况)。经过讨论后发现即使使用了 SynchronousQueue 也无法有效的避免任务丢失,但这的确是一个思路,没准以后在其他场景中用得上。

到此这篇关于详解Java七大阻塞队列之SynchronousQueue的文章就介绍到这了,更多相关Java阻塞队列 SynchronousQueue内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: 详解Java七大阻塞队列之SynchronousQueue

本文链接: https://lsjlt.com/news/134627.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 详解Java七大阻塞队列之SynchronousQueue
    目录分析其实SynchronousQueue 是一个特别有意思的阻塞队列,就我个人理解来说,它很重要的特点就是没有容量。 直接看一个例子: package dongguabai....
    99+
    2024-04-02
  • Java 阻塞队列BlockingQueue详解
    目录一. 前言二. 认识BlockingQueue三.BlockingQueue的核心方法:四.常见BlockingQueue五. 小结一. 前言 在新增的Concurrent包中,...
    99+
    2024-04-02
  • Java阻塞队列BlockingQueue详解
    目录队列的类型数据结构阻塞队列 BlockingQueue常见的阻塞队列BlockingQueue APIArrayBlockingQueue 源码简解生产者消费者模式延迟队列 De...
    99+
    2024-04-02
  • 详解Java中的阻塞队列
    什么是阻塞队列 在数据结构中,队列遵循FIFO(先进先出)原则。在java中,Queue接口定义了定义了基本行为,由子类完成实现,常见的队列有ArrayDeque、LinkedLis...
    99+
    2024-04-02
  • Java并发编程之阻塞队列(BlockingQueue)详解
    目录队列阻塞队列ArrayBlockingQueue重要属性构造方法添加元素add(e)offer(e)put(e)offer(e,time,unit)移除元素take()deque...
    99+
    2024-04-02
  • Java多线程案例之阻塞队列详解
    目录一.阻塞队列介绍1.1阻塞队列特性1.2阻塞队列的优点二.生产者消费者模型2.1阻塞队列对生产者的优化三.标准库中的阻塞队列3.1Java提供阻塞队列实现的标准类3.2Block...
    99+
    2022-11-13
    Java多线程阻塞队列 Java 阻塞队列 Java多线程
  • Java并发编程之阻塞队列深入详解
    目录1. 什么是阻塞队列2. 阻塞队列的代码使用3. 生产者消费者模型(1)应用一:解耦合(2)应用二:削峰填谷(3)相关代码4.阻塞队列和生产者消费者模型功能的实现1. 什么是阻塞...
    99+
    2024-04-02
  • java 中 阻塞队列BlockingQueue详解及实例
    java 中 阻塞队列BlockingQueue详解及实例BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从Blo...
    99+
    2023-05-31
    java 阻塞队列 blockingqueue
  • Java多线程案例之阻塞队列
    文章目录 一. 认识阻塞队列1. 什么是阻塞队列2. 生产者消费者模型3. 标准库中阻塞队列类 二. 基于循环队列实现的简单阻塞队列1. 循环队列的简单实现2. 阻塞队列的简单实现 ...
    99+
    2023-09-01
    java 面试 阻塞队列 生产者消费者模型 多线程
  • 详解Java阻塞队列(BlockingQueue)的实现原理
    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取...
    99+
    2023-05-31
    java 阻塞队列 ava
  • Java面试必备之AQS阻塞队列和条件队列
    一.AQS入队规则 我们仔细分析一下AQS是如何维护阻塞队列的,在独占方式获取资源的时候,是怎么将竞争锁失败的线程丢到阻塞队列中的呢? 我们看看acquire方法,这里首先会调用子类...
    99+
    2024-04-02
  • 阻塞队列之如何理解LinkedBlockingQueue源码
    本篇内容介绍了“阻塞队列之如何理解LinkedBlockingQueue源码”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读...
    99+
    2024-04-02
  • Java多线程之多种锁和阻塞队列
    目录一、悲观锁和乐观锁1.1. 乐观锁1.2. 悲观锁二、公平锁和非公平锁三、可重入锁(递归锁)四、自旋锁五、独占锁(写)/共享锁(读)六、什么是阻塞队列?七、阻塞队列(Blocki...
    99+
    2024-04-02
  • Java常见的阻塞队列总结
    Java阻塞队列 阻塞队列和普通队列主要区别在阻塞二字: 阻塞添加:队列已满时,添加元素线程会阻塞,直到队列不满时才唤醒线程执行添加操作 阻塞删除:队列元素为空时,...
    99+
    2024-04-02
  • Java阻塞队列BlockingQueue怎么使用
    本篇内容介绍了“Java阻塞队列BlockingQueue怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一. 前言在新增的Concu...
    99+
    2023-07-02
  • Java阻塞队列BlockingQueue怎么实现
    这篇文章主要讲解了“Java阻塞队列BlockingQueue怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java阻塞队列BlockingQueue怎么实现”吧!BlockingQ...
    99+
    2023-06-02
  • 如何在Java中实现阻塞队列
    如何在Java中实现阻塞队列?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Java阻塞队列阻塞队列和普通队列主要区别在阻塞二字:阻塞添加:队列已满时,添加元素线...
    99+
    2023-06-15
  • Java阻塞队列BlockingQueue基础与使用
    目录什么是阻塞队列阻塞队列的特点BlockingQueue不是新的东西学会使用队列SynchronousQueue 同步队列什么是阻塞队列 阻塞队列本质上还是一种队列,遵循先进先出,...
    99+
    2023-01-03
    Java阻塞队列 Java BlockingQueue
  • Java阻塞队列的实现及应用
    目录1.手写生产者消费者模型2.手写定时器总结1.手写生产者消费者模型 所谓生产者消费者模型,可以用我们生活中的例子来类比:我去一个小摊儿买吃的,老板把已经做好的小吃都放在摆盘上,供...
    99+
    2024-04-02
  • Java 阻塞队列和线程池原理分析
    目录【1】阻塞队列一、什么是阻塞队列?二、阻塞队列有什么用?三、阻塞队列的简单实用【2】Java 线程池一、我们为什么需要Java 线程池?使用它的好处是什么?二、Java中主要提供...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作