Java并发工具类LongAdder原理实例解析

作者:myseries 时间:2023-11-25 15:50:20 

LongAdder实现原理图

Java并发工具类LongAdder原理实例解析

Java并发工具类LongAdder原理实例解析

高并发下N多线程同时去操作一个变量会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性。既然AtomicLong性能问题是由于过多线程同时去竞争同一个变量的更新而降低的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源。

LongAdder则是内部维护一个Cells数组,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同一个原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取其他原子变量的锁,最后当获取当前值时候是把所有变量的值累加后再加上base的值返回的。

LongAdder维护了要给延迟初始化的原子性更新数组和一个基值变量base数组的大小保持是2的N次方大小,数组表的下标使用每个线程的hashcode值的掩码表示,数组里面的变量实体是Cell类型。

Cell 类型是Atomic的一个改进,用来减少缓存的争用,对于大多数原子操作字节填充是浪费的,因为原子操作都是无规律的分散在内存中进行的,多个原子性操作彼此之间是没有接触的,但是原子性数组元素彼此相邻存放将能经常共享缓存行,也就是伪共享。所以这在性能上是一个提升。

另外由于Cells占用内存是相对比较大的,所以一开始并不创建,而是在需要时候再创建,也就是惰性加载,当一开始没有空间时候,所有的更新都是操作base变量。

java.util.concurrency.atomic.LongAdder是Java8新增的一个类,提供了原子累计值的方法。根据文档的描述其性能要优于AtomicLong

这里测试时基于JDK1.8进行的,AtomicLong 从Java8开始针对x86平台进行了优化,使用XADD替换了CAS操作,我们知道JUC下面提供的原子类都是基于Unsafe类实现的,并由Unsafe来提供CAS的能力。CAS (compare-and-swap)本质上是由现代CPU在硬件级实现的原子指令,允许进行无阻塞,多线程的数据操作同时兼顾了安全性以及效率。大部分情况下,CAS都能够提供不错的性能,但是在高竞争的情况下开销可能会成倍增长,具体的研究可以参考这篇文章, 我们直接看下代码:


public class AtomicLong {
public final long incrementAndGet() {
   return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
 }
}

public final class Unsafe {
public final long getAndAddLong(Object var1, long var2, long var4) {
   long var6;
   do {
     var6 = this.getLongVolatile(var1, var2);
   } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
   return var6;
 }
}

getAndAddLong方法会以volatile的语义去读需要自增的域的最新值,然后通过CAS去尝试更新,正常情况下会直接成功后返回,但是在高并发下可能会同时有很多线程同时尝试这个过程,也就是说线程A读到的最新值可能实际已经过期了,因此需要在while循环中不断的重试,造成很多不必要的开销,而xadd的相对来说会更高效一点,伪码如下,最重要的是下面这段代码是原子的,也就是说其他线程不能打断它的执行或者看到中间值,这条指令是在硬件级直接支持的:


function FetchAndAdd(address location, int inc) {
 int value := *location
 *location := value + inc
 return value
}

而LongAdder的性能比上面那种还要好很多,于是就研究了一下。首先它有一个基础的值base,在发生竞争的情况下,会有一个Cell数组用于将不同线程的操作离散到不同的节点上去(会根据需要扩容,最大为CPU核数),sum()会将所有Cell数组中的value和base累加作为返回值。核心的思想就是将AtomicLong一个value的更新压力分散到多个value中去,从而降低更新热点。

Java并发工具类LongAdder原理实例解析


public class LongAdder extends Striped64 implements Serializable {
//...
}

LongAdder继承自Striped64,Striped64内部维护了一个懒加载的数组以及一个额外的base实例域,数组的大小是2的N次方,使用每个线程Thread内部的哈希值访问。


abstract class Striped64 extends Number {
/** Number of CPUS, to place bound on table size */
 static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
  * Table of cells. When non-null, size is a power of 2.
  */
 transient volatile Cell[] cells;

@sun.misc.Contended static final class Cell {
   volatile long value;
   Cell(long x) { value = x; }
   final boolean cas(long cmp, long val) {
     return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
   }

// Unsafe mechanics
   private static final sun.misc.Unsafe UNSAFE;
   private static final long valueOffset;
   static {
     try {
       UNSAFE = sun.misc.Unsafe.getUnsafe();
       Class<?> ak = Cell.class;
       valueOffset = UNSAFE.objectFieldOffset
         (ak.getDeclaredField("value"));
     } catch (Exception e) {
       throw new Error(e);
     }
   }
 }

}

数组的元素是Cell类,可以看到Cell类用Contended注解修饰,这里主要是解决false sharing(伪共享的问题),不过个人认为伪共享翻译的不是很好,或者应该是错误的共享,比如两个volatile变量被分配到了同一个缓存行,但是这两个的更新在高并发下会竞争,比如线程A去更新变量a,线程B去更新变量b,但是这两个变量被分配到了同一个缓存行,因此会造成每个线程都去争抢缓存行的所有权,例如A获取了所有权然后执行更新这时由于volatile的语义会造成其刷新到主存,但是由于变量b也被缓存到同一个缓存行,因此就会造成cache miss,这样就会造成极大的性能损失,因此有一些类库的作者,例如JUC下面的、Disruptor等都利用了插入dummy 变量的方式,使得缓存行被其独占,比如下面这种代码:


static final class Cell {
   volatile long p0, p1, p2, p3, p4, p5, p6;
   volatile long value;
   volatile long q0, q1, q2, q3, q4, q5, q6;
   Cell(long x) { value = x; }

final boolean cas(long cmp, long val) {
     return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
   }

// Unsafe mechanics
   private static final sun.misc.Unsafe UNSAFE;
   private static final long valueOffset;
   static {
     try {
       UNSAFE = getUnsafe();
       Class<?> ak = Cell.class;
       valueOffset = UNSAFE.objectFieldOffset
         (ak.getDeclaredField("value"));
     } catch (Exception e) {
       throw new Error(e);
     }
   }
}

但是这种方式毕竟不通用,例如32、64位操作系统的缓存行大小不一样,因此JAVA8中就增加了一个注@sun.misc.Contended解用于解决这个问题,由JVM去插入这些变量,具体可以参考openjdk.java.net/jeps/142 ,但是通常来说对象是不规则的分配到内存中的,但是数组由于是连续的内存,因此可能会共享缓存行,因此这里加一个Contended注解以防cells数组发生伪共享的情况。


/**
* 底竞争下直接更新base,类似AtomicLong
* 高并发下,会将每个线程的操作hash到不同的
* cells数组中,从而将AtomicLong中更新
* 一个value的行为优化之后,分散到多个value中
* 从而降低更新热点,而需要得到当前值的时候,直接
* 将所有cell中的value与base相加即可,但是跟
* AtomicLong(compare and change -> xadd)的CAS不同,
* incrementAndGet操作及其变种
* 可以返回更新后的值,而LongAdder返回的是void
*/
public class LongAdder {
 public void add(long x) {
   Cell[] as; long b, v; int m; Cell a;
   /**
    * 如果是第一次执行,则直接case操作base
    */
   if ((as = cells) != null || !casBase(b = base, b + x)) {
     boolean uncontended = true;
     /**
      * as数组为空(null或者size为0)
      * 或者当前线程取模as数组大小为空
      * 或者cas更新Cell失败
      */
     if (as == null || (m = as.length - 1) < 0 ||
       (a = as[getProbe() & m]) == null ||
       !(uncontended = a.cas(v = a.value, v + x)))
       longAccumulate(x, null, uncontended);
   }
 }

public long sum() {
   //通过累加base与cells数组中的value从而获得sum
   Cell[] as = cells; Cell a;
   long sum = base;
   if (as != null) {
     for (int i = 0; i < as.length; ++i) {
       if ((a = as[i]) != null)
         sum += a.value;
     }
   }
   return sum;
 }
}

/**
* openjdk.java.net/jeps/142
*/
@sun.misc.Contended static final class Cell {
 volatile long value;
 Cell(long x) { value = x; }
 final boolean cas(long cmp, long val) {
   return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
 }

// Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long valueOffset;
 static {
   try {
     UNSAFE = sun.misc.Unsafe.getUnsafe();
     Class<?> ak = Cell.class;
     valueOffset = UNSAFE.objectFieldOffset
       (ak.getDeclaredField("value"));
   } catch (Exception e) {
     throw new Error(e);
   }
 }
}

abstract class Striped64 extends Number {

final void longAccumulate(long x, LongBinaryOperator fn,
              boolean wasUncontended) {
   int h;
   if ((h = getProbe()) == 0) {
     /**
      * 若getProbe为0,说明需要初始化
      */
     ThreadLocalRandom.current(); // force initialization
     h = getProbe();
     wasUncontended = true;
   }
   boolean collide = false;        // True if last slot nonempty
   /**
    * 失败重试
    */
   for (;;) {
     Cell[] as; Cell a; int n; long v;
     if ((as = cells) != null && (n = as.length) > 0) {
       /**
        * 若as数组已经初始化,(n-1) & h 即为取模操作,相对 % 效率要更高
        */
       if ((a = as[(n - 1) & h]) == null) {
         if (cellsBusy == 0) {    // Try to attach new Cell
           Cell r = new Cell(x);  // Optimistically create
           if (cellsBusy == 0 && casCellsBusy()) {//这里casCellsBusy的作用其实就是一个spin lock
             //可能会有多个线程执行了`Cell r = new Cell(x);`,
             //因此这里进行cas操作,避免线程安全的问题,同时前面在判断一次
             //避免正在初始化的时其他线程再进行额外的cas操作
             boolean created = false;
             try {        // Recheck under lock
               Cell[] rs; int m, j;
               //重新检查一下是否已经创建成功了
               if ((rs = cells) != null &&
                 (m = rs.length) > 0 &&
                 rs[j = (m - 1) & h] == null) {
                 rs[j] = r;
                 created = true;
               }
             } finally {
               cellsBusy = 0;
             }
             if (created)
               break;
             continue;      // Slot 现在是非空了,continue到下次循环重试
           }
         }
         collide = false;
       }
       else if (!wasUncontended)    // CAS already known to fail
         wasUncontended = true;   // Continue after rehash
       else if (a.cas(v = a.value, ((fn == null) ? v + x :
                      fn.applyAsLong(v, x))))
         break;//若cas更新成功则跳出循环,否则继续重试
       else if (n >= NCPU || cells != as) // 最大只能扩容到CPU数目, 或者是已经扩容成功,这里只有的本地引用as已经过期了
         collide = false;      // At max size or stale
       else if (!collide)
         collide = true;
       else if (cellsBusy == 0 && casCellsBusy()) {
         try {
           if (cells == as) {   // 扩容
             Cell[] rs = new Cell[n << 1];
             for (int i = 0; i < n; ++i)
               rs[i] = as[i];
             cells = rs;
           }
         } finally {
           cellsBusy = 0;
         }
         collide = false;
         continue;          // Retry with expanded table
       }
       //重新计算hash(异或)从而尝试找到下一个空的slot
       h = advanceProbe(h);
     }
     else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
       boolean init = false;
       try {              // Initialize table
         if (cells == as) {
           /**
            * 默认size为2
            */
           Cell[] rs = new Cell[2];
           rs[h & 1] = new Cell(x);
           cells = rs;
           init = true;
         }
       } finally {
         cellsBusy = 0;
       }
       if (init)
         break;
     }
     else if (casBase(v = base, ((fn == null) ? v + x : // 若已经有另一个线程在初始化,那么尝试直接更新base
                   fn.applyAsLong(v, x))))
       break;             // Fall back on using base
   }
 }

final boolean casCellsBusy() {
   return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
 }

static final int getProbe() {
   /**
    * 通过Unsafe获取Thread中threadLocalRandomProbe的值
    */
   return UNSAFE.getInt(Thread.currentThread(), PROBE);
 }

// Unsafe mechanics
   private static final sun.misc.Unsafe UNSAFE;
   private static final long BASE;
   private static final long CELLSBUSY;
   private static final long PROBE;
   static {
     try {
       UNSAFE = sun.misc.Unsafe.getUnsafe();
       Class<?> sk = Striped64.class;
       BASE = UNSAFE.objectFieldOffset
         (sk.getDeclaredField("base"));
       CELLSBUSY = UNSAFE.objectFieldOffset
         (sk.getDeclaredField("cellsBusy"));
       Class<?> tk = Thread.class;
       //返回Field在内存中相对于对象内存地址的偏移量
       PROBE = UNSAFE.objectFieldOffset
         (tk.getDeclaredField("threadLocalRandomProbe"));
     } catch (Exception e) {
       throw new Error(e);
     }
   }
}

由于Cell相对来说比较占内存,因此这里采用懒加载的方式,在无竞争的情况下直接更新base域,在第一次发生竞争的时候(CAS失败)就会创建一个大小为2的cells数组,每次扩容都是加倍,只到达到CPU核数。同时我们知道扩容数组等行为需要只能有一个线程同时执行,因此需要一个锁,这里通过CAS更新cellsBusy来实现一个简单的spin lock。

数组访问索引是通过Thread里的threadLocalRandomProbe域取模实现的,这个域是ThreadLocalRandom更新的,cells的数组大小被限制为CPU的核数,因为即使有超过核数个线程去更新,但是每个线程也只会和一个CPU绑定,更新的时候顶多会有cpu核数个线程,因此我们只需要通过hash将不同线程的更新行为离散到不同的slot即可。

我们知道线程、线程池会被关闭或销毁,这个时候可能这个线程之前占用的slot就会变成没人用的,但我们也不能清除掉,因为一般web应用都是长时间运行的,线程通常也会动态创建、销毁,很可能一段时间后又会被其他线程占用,而对于短时间运行的,例如单元测试,清除掉有啥意义呢?

总结

总的来说,LongAdder从性能上来说要远远好于AtomicLong,一般情况下是可以直接替代AtomicLong使用的,Netty也通过一个接口封装了这两个类,在Java8下直接采用LongAdder,但是AtomicLong的一系列方法不仅仅可以自增,还可以获取更新后的值,如果是例如获取一个全局唯一的ID还是采用AtomicLong会方便一点。

来源:https://www.cnblogs.com/myseries/p/12699433.html

标签:Java,并发,工具,类,LongAdder
0
投稿

猜你喜欢

  • Spring Security过滤器链体系的实例详解

    2023-08-25 03:24:15
  • Android自定义Drawable实现圆形和圆角

    2022-05-07 14:59:59
  • IDEA的默认快捷键设置与Eclipse的常用快捷键的设置方法

    2023-04-09 18:32:40
  • java中的Arrays这个工具类你真的会用吗(一文秒懂)

    2021-11-28 06:25:11
  • 解决Spring国际化文案占位符失效问题的方法

    2022-10-20 23:42:23
  • MyBatis3用log4j在控制台输出SQL的方法示例

    2023-07-01 07:32:36
  • FeignClient如何通过配置变量调用配置文件url

    2023-05-07 08:19:25
  • Android开发之TabHost选项卡及相关疑难解决方法

    2022-01-10 06:30:57
  • 解决Android Studio安装后运行出错dose not...和Internal error...

    2023-04-24 11:54:42
  • IOS 实现摇一摇的操作

    2023-07-02 13:46:53
  • Android编程判断SD卡是否存在及使用容量查询实现方法

    2022-02-08 23:15:28
  • Spring源码完美导入IDEA的过程

    2023-05-13 14:11:50
  • SpringBoot小程序推送信息的项目实践

    2021-12-07 04:23:34
  • Java 爬虫服务器被屏蔽的解决方案

    2022-11-06 13:23:46
  • 基于mybatis逆向工程的使用步骤详解

    2022-10-28 09:27:26
  • JAVA面试题之Forward与Redirect的区别详解

    2023-11-25 02:27:15
  • Mybatis结果生成键值对的实例代码

    2023-11-28 15:50:58
  • Android实现recyclerview城市字母索引列表

    2023-09-28 04:22:06
  • 深入理解Java设计模式之备忘录模式

    2023-09-20 06:16:43
  • MVVM和MVVMLight框架介绍及在项目中的使用详解

    2021-07-29 06:59:10
  • asp之家 软件编程 m.aspxhome.com