返回顶部
首页 > 资讯 > 精选 >并发计数类LongAdder怎么用
  • 739
分享到

并发计数类LongAdder怎么用

2023-06-04 19:06:41 739人浏览 安东尼
摘要

本篇内容介绍了“并发计数类LongAdder怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!AtomicLong是通过CAS(即Comp

本篇内容介绍了“并发计数类LongAdder怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

AtomicLong是通过CAS(即Compare And Swap)原理来完成原子递增递减操作,在并发情况下不会出现线程安全结果。AtomicLong中的value是使用volatile修饰,并发下各个线程对value最新值均可见。我们以incrementAndGet()方法来深入。

  public final long incrementAndGet() {

        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;

    }

这里是调用了unsafe的方法

    public final long getAndAddLong(Object var1, long var2, long var4) {

        long var6;

        do {

            var6 = this.getLongVolatile(var1, var2);

        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;

    }

方法中this.compareAndSwapLong()有4个参数,var1是需要修改的类对象,var2是需要修改的字段的内存地址,var6是修改前字段的值,var6+var4是修改后字段的值。compareAndSwapLong只有该字段实际值和var6值相当的时候,才可以成功设置其为var6+var4。

再继续往深一层去看

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

这里Unsafe.compareAndSwapLong是native方法,底层通过JNI(Java Native Interface)来完成调用,实际就是借助C来调用CPU指令来完成。

实现中使用了do-while循环,如果CAS失败,则会继续重试,直到成功为止。并发特别高的时候,虽然这里可能会有很多次循环,但是还是可以保证线程安全的。不过如果自旋CAS操作长时间不成功,竞争较大,会带CPU带来极大的开销,占用更多的执行资源,可能会影响其他主业务的计算等。

LongAdder怎么优化AtomicLong

Doug Lea在jdk1.5的时候就针对HashMap进行了线程安全和并发性能的优化,推出了分段实现的ConcurrentHashMap。一般Java面试,基本上离不开ConcurrentHashMap这个网红问题。另外在ForkJoinPool中,Doug Lea在其工作窃取算法上对WorkQueue使用了细粒度锁来较少并发的竞争,更多细节可参考我的原创文章ForkJoin使用和原理剖析。如果已经对ConcurrentHashMap有了较为深刻的理解,那么现在来看LongAdder的实现就会相对简单了。

来看LongAdder的increase()方法实现,

    public void add(long x) {

        Cell[] as; long b, v; int m; Cell a;

        //第一个if进行了两个判断,(1)如果cells不为空,则直接进入第二个if语句中。(2)同样会先使用cas指令来尝试add,如果成功则直接返回。如果失败则说明存在竞争,需要重新add

        if ((as = cells) != null || !casBase(b = base, b + x)) {

            boolean uncontended = true;

            if (as == null || (m = as.length - 1) < 0 ||

                (a = as[getProbe() & m]) == null ||

                !(uncontended = a.cas(v = a.value, v + x)))

                longAccumulate(x, null, uncontended);

        }

    }

这里用到了Cell类对象,Cell对象是LongAdder高并发实现的关键。在casBase冲突严重的时候,就会去创建Cell对象并添加到cells中,下面会详细分析。

    @sun.misc.Contended static final class Cell {

        volatile long value;

        Cell(long x) { value = x; }

        //提供CAS方法修改当前Cell对象上的value

        final boolean cas(long cmp, long val) {

            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);

        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;

        private static final long valueOffset;

        static {

            try {

                UNSAFE = sun.misc.Unsafe.getUnsafe();

                Class<?> ak = Cell.class;

                valueOffset = UNSAFE.objectFieldOffset

                    (ak.getDeclaredField("value"));

            } catch (Exception e) {

                throw new Error(e);

            }

        }

    }

而这一句a = as[getProbe() & m]其实就是通过getProbe()拿到当前Thread的threadLocalRandomProbe的probe Hash值。这个值其实是一个随机值,这个随机值由当前线程ThreadLocalRandom.current()产生。不用Rondom的原因是因为这里已经是高并发了,多线程情况下Rondom会极大可能得到同一个随机值。因此这里使用threadLocalRandomProbe在高并发时会更加随机,减少冲突。更多ThreadLocalRandom信息想要深入了解可关注这篇文章并发包中ThreadLocalRandom类原理浅尝。拿到as数组中当前线程的Cell对象,然后再进行CAS的更新操作,我们在源码上进行分析。longAccumulate()是在父类Striped64.java中。

    final void longAccumulate(long x, LongBinaryOperator fn,

                              boolean wasUncontended) {

        int h;

        if ((h = getProbe()) == 0) {

          //如果当前线程的随机数为0,则初始化随机数

            ThreadLocalRandom.current(); // force initialization

            h = getProbe();

            wasUncontended = true;

        }

        boolean collide = false;                // True if last slot nonempty

        for (;;) {

            Cell[] as; Cell a; int n; long v;

            //如果当前cells数组不为空

            if ((as = cells) != null && (n = as.length) > 0) {

            //如果线程随机数对应的cells对应数组下标的Cell元素不为空,

                if ((a = as[(n - 1) & h]) == null) {

                //当使用到LongAdder的Cell数组相关的操作时,需要先获取全局的cellsBusy的锁,才可以进行相关操作。如果当前有其他线程的使用,则放弃这一步,继续for循环重试。

                    if (cellsBusy == 0) {       // Try to attach new Cell

                    //Cell的初始值是x,创建完毕则说明已经加上

                        Cell r = new Cell(x);   // Optimistically create

                        //casCellsBusy获取锁,cellsBusy通过CAS方式获取锁,当成功设置cellsBusy为1时,则获取到锁。

                        if (cellsBusy == 0 && casCellsBusy()) {

                            boolean created = false;

                            try {               // Recheck under lock

                                Cell[] rs; int m, j;

                                if ((rs = cells) != null &&

                                    (m = rs.length) > 0 &&

                                    rs[j = (m - 1) & h] == null) {

                                    rs[j] = r;

                                    created = true;

                                }

                            } finally {

                              //finally里面释放锁

                                cellsBusy = 0;

                            }

                            if (created)

                                break;

                            continue;           // Slot is now non-empty

                        }

                    }

                    collide = false;

                }

                else if (!wasUncontended)       // CAS already known to fail

                    wasUncontended = true;      // Continue after rehash

                //如果a不为空,则对a进行cas增x操作,成功则返回    

                else if (a.cas(v = a.value, ((fn == null) ? v + x :

                                             fn.applyAsLong(v, x))))

                    break;

                //cells的长度n已经大于CPU数量,则继续扩容没有意义,因此直接标记为不冲突

                else if (n >= NCPU || cells != as)

                    collide = false;            // At max size or stale

                else if (!collide)

                    collide = true;

                //到这一步则说明a不为空但是a上进行CAS操作也有多个线程在竞争,因此需要扩容cells数组,其长度为原长度的2倍

                else if (cellsBusy == 0 && casCellsBusy()) {

                    try {

                        if (cells == as) {      // Expand table unless stale

                            Cell[] rs = new Cell[n << 1];

                            for (int i = 0; i < n; ++i)

                                rs[i] = as[i];

                            cells = rs;

                        }

                    } finally {

                        cellsBusy = 0;

                    }

                    collide = false;

                    continue;                   // Retry with expanded table

                }

                //继续使用新的随机数,避免在同一个Cell上竞争

                h = advanceProbe(h);

            }

            //如果cells为空,则需要先创建Cell数组。初始长度为2.(个人理解这个if放在前面会比较好一点,哈哈)

            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

                boolean init = false;

                try {                           // Initialize table

                    if (cells == as) {

                        Cell[] rs = new Cell[2];

                        rs[h & 1] = new Cell(x);

                        cells = rs;

                        init = true;

                    }

                } finally {

                    cellsBusy = 0;

                }

                if (init)

                    break;

            }

            //如果在a上竞争失败,且扩容竞争也失败了,则在casBase上尝试增加数量

            else if (casBase(v = base, ((fn == null) ? v + x :

                                        fn.applyAsLong(v, x))))

                break;                          // Fall back on using base

        }

    }

最后是求LongAdder的总数,这一步就非常简单了,把base的值和所有cells上的value值加起来就是总数了。

    public long sum() {

        Cell[] as = cells; Cell a;

        long sum = base;

        if (as != null) {

            for (int i = 0; i < as.length; ++i) {

                if ((a = as[i]) != null)

                    sum += a.value;

            }

        }

        return sum;

    }

思考

源码中Cell数组的会控制在不超过NCPU的两倍,原因是LongAdder其实在底层是依赖CPU的CAS指令来操作,如果多出太多,即使在代码层面没有竞争,在底层CPU的竞争会更多,所以这里会有一个数量的限制。所以在LongAdder的设计中,由于使用到CAS指令操作,瓶颈在于CPU上。

YY一下,那么有没有方式可以突破这个瓶颈呢?我个人觉得是有的,但是有前提条件,应用场景极其有限。基于ThreadLocal的设计,假设统计只在一个固定的线程池中进行,假设线程池中的线程不会销毁(异常补充线程的就暂时不管了),则可以认为线程数量是固定且不变的,那么统计则可以依赖于只在当前线程中进行,那么即使是高并发,就转化为ThreadLocal这种单线程操作了,完全可以摆脱CAS的CPU指令操作的限制,那么性能将极大提升。

“并发计数类LongAdder怎么用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: 并发计数类LongAdder怎么用

本文链接: https://lsjlt.com/news/239150.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 并发计数类LongAdder怎么用
    本篇内容介绍了“并发计数类LongAdder怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!AtomicLong是通过CAS(即Comp...
    99+
    2023-06-04
  • java如何使用LongAdder计数器
    小编给大家分享一下java如何使用LongAdder计数器,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!使用LongAdder计...
    99+
    2024-04-02
  • 并发编程LongAdder的原理是什么
    本篇内容介绍了“并发编程LongAdder的原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!目录一、前言二、LongAdder类的使...
    99+
    2023-06-20
  • Java并发编程ThreadLocalRandom类怎么使用
    本篇内容介绍了“Java并发编程ThreadLocalRandom类怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!为什么需要Thre...
    99+
    2023-07-02
  • C#并发编程之Task类怎么使用
    这篇文章主要介绍了C#并发编程之Task类怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇C#并发编程之Task类怎么使用文章都会有所收获,下面我们一起来看看吧。Task.RunTask是建立在线程池之上...
    99+
    2023-07-05
  • Redis实现高并发计数器
    业务需求中经常有需要用到计数器的场景:譬如一个手机号一天限制发送5条短信、一个接口一分钟限制多少请求、一个接口一天限制调用多少次等等。使用Redis的Incr自增命令可以轻松实现以上需求。以一个接口一天限制...
    99+
    2024-04-02
  • golang怎么并发数据
    Go语言是一种以并发编程为设计重点的语言,它具有轻量级线程(即goroutine)和可扩展的通信机制,使得其在处理并发数据时变得非常高效。本文将介绍如何使用goroutine和通道在Go中实现并发数据处理。goroutine是一种轻量级线程...
    99+
    2023-05-14
  • Visual Studio 中怎么利用Parallel类实现并行计算
    Visual Studio 中怎么利用Parallel类实现并行计算,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Parallel类Parallel类就是TPL...
    99+
    2023-06-17
  • Delphi中怎么进行并发编程和并行计算
    在Delphi中进行并发编程和并行计算可以使用多线程的方式来实现。下面是一个简单的示例代码: unit MainUnit; int...
    99+
    2024-03-13
    Delphi
  • Golang函数的并发安全设计
    golang 函数的并发安全性至关重要。根据访问的共享资源类型,并发安全函数可分为不可变函数和可变函数。可变函数需要使用适当的同步机制,如互斥锁、读写锁和原子值,来保证并发安全性。实战案...
    99+
    2024-04-16
    golang函数 并发安全 golang 并发访问 同步机制
  • 如何使用golang函数并发控制进行并行计算
    在 go 中可以使用 goroutine 和函数并发控制实现并行计算,具体步骤如下:创建 goroutine,使用 go 关键字创建轻量级并发执行单元。使用 sync.waitgroup...
    99+
    2024-04-24
    并发控制 并行计算 golang
  • golang怎么控制并发数
    在Go中,可以使用goroutine和channel来控制并发数。 首先,可以使用make函数创建一个带有指定并发数的channel...
    99+
    2023-10-21
    golang
  • 怎么在Java中利用Redis实现一个高并发计数器功能
    这篇文章主要介绍了怎么在Java中利用Redis实现一个高并发计数器功能,编程网小编觉得不错,现在分享给大家,也给大家做个参考,一起跟随编程网小编来看看吧!Java是什么Java是一门面向对象编程语言,可以编写桌面应用程序、Web应用程序、...
    99+
    2023-06-06
  • golang函数类型的并发处理
    go函数类型支持并发处理,可创建并行执行代码块的应用程序。可以通过定义函数类型并使用goroutine创建goroutine来实现并发:定义函数类型:使用func关键字定义函数签名,指定...
    99+
    2024-04-28
    golang 并发处理 字符串数组
  • 怎么在Java中利用LockSupport类实现并发编程
    今天就跟大家聊聊有关怎么在Java中利用LockSupport类实现并发编程,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、LockSupport类的属性private ...
    99+
    2023-06-15
  • php怎么合并数组并转换成数值类型
    方法:1、用“array_merge(数组1,数组2...)”把一个或多个数组合并为一个数组;2、用“implode(合并数组)”将合并数组转为字符串;3、用“settype(字符串,类型标识)”将字符串转为指定数值类型(整数或者浮点数)。...
    99+
    2022-06-14
    php数组 php
  • golang怎么控制并发数量
    在Go语言中,可以使用goroutine和channel来控制并发数量。 首先,可以使用goroutine来并发执行任务。例如,有一...
    99+
    2023-10-26
    golang
  • PHP并发编程:如何使用不同的数据类型实现分布式计算?
    随着计算机技术的不断发展,分布式计算已经成为了一个重要的研究领域。在分布式计算中,不同的计算机可以同时处理多个任务,从而提高计算效率。而PHP作为一种流行的Web编程语言,也可以使用不同的数据类型来实现分布式计算。本文将介绍如何使用PHP...
    99+
    2023-11-11
    并发 数据类型 分布式
  • 怎么用python统计文件类型及数量
    要统计文件类型及数量,你可以使用Python的os模块来遍历文件夹中的文件,然后使用文件的后缀来判断文件类型。以下是一个示例代码: ...
    99+
    2023-10-21
    python
  • 怎么使用Golang并发读取文件数据并写入数据库
    本篇内容介绍了“怎么使用Golang并发读取文件数据并写入数据库”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!项目结构data文件夹中包含数...
    99+
    2023-07-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作