CAS深度解析(JAVA)

1、CAS基本介绍

CAS(Compare and Swap),即比较并替换,实现并发算法时常用到的一种技术,Doug lea大神在java同步器中大量使用了CAS技术,鬼斧神工的实现了多线程执行的安全性。

CAS的思想很简单:三个参数,一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。

2、应用

2.1 模拟并发自增

public class AtomicIntegerTest {     private static int counter = 0;      public static void main(String[] args) throws InterruptedException {         CountDownLatch countDownLatch = new CountDownLatch(10);         for (int i = 0; i < 10; i++) {             new Thread(() -> {                 for (int j = 0; j < 1000; j++) {                     counter++;                 }                 countDownLatch.countDown();             }).start();         }         countDownLatch.await();         System.out.println("counter=" + counter);     } } 

输出结果一定是counter<=10000
CAS深度解析(JAVA)

2.2 模拟CAS自增

public class AtomicIntegerTest {     static AtomicInteger atomic = new AtomicInteger(0);      public static void main(String[] args) throws InterruptedException {         CountDownLatch countDownLatch = new CountDownLatch(10);          for (int i = 0; i < 10; i++) {             new Thread(() -> {                 for (int j = 0; j < 1000; j++) {                     atomic.getAndIncrement();                 }                 countDownLatch.countDown();             }).start();         }         countDownLatch.await();         System.out.println(atomic.get());     } } 

输出结果一定等于10000
CAS深度解析(JAVA)

3、解析

3.1 我们先看看AtomicInteger这个类

以下是atomic.getAndIncrement()的方法调用层次结构:

AtomicInteger public final int getAndIncrement() {         return unsafe.getAndAddInt(this, valueOffset, 1);     }      Unsafe public final int getAndAddInt(Object var1, long var2, int var4){         int var5;         do {             var5 = this.getIntVolatile(var1, var2);         } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));         return var5;     }      Unsafe public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); 

3.2 探索CAS如何保证原子性

Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp中

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))   UnsafeWrapper("Unsafe_CompareAndSwapInt");   oop p = JNIHandles::resolve(obj);   jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);   return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END 

1、先想办法拿到变量value在内存中的地址。
2、通过Atomic::cmpxchg实现比较替换,其中参数x是即将更新的值,参数e是原内存的值。

在Linux的x86处理器架构,Atomic::cmpxchg方法的实现如下:
(地址位于:OpenJDK8-master/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp)

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {  int mp = os::is_MP();  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"  : "=a" (exchange_value)  : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)  : "cc", "memory");  return exchange_value; } 

我们的重点来到了

LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" 

3.2.1 cmpxchgl的详细执行过程

嵌入汇编的基本格式:

asm ( assembler template     : output operands                  /* optional */     : input operands                   /* optional */     : list of clobbered registers      /* optional */     ); 

1、template就是cmpxchgl %1,(%3)表示汇编模板
2、output operands表示输出操作数,=a对应eax寄存器
3、input operand 表示输入参数,%1 就是exchange_value, %3是dest, %4就是mp, r表示任意寄存器,a还是eax寄存器
4、list of clobbered registers就是些额外参数,cc表示编译器cmpxchgl的执行将影响到标志寄存器, memory告诉编译器要重新从内存中读取变量的最新值,这点实现了volatile的感觉。

首先,输入参数是"r" (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表示compare_value存入eax寄存器,而exchange_value、dest、mp的值存入任意的通用寄存器。嵌入式汇编规定把输出和输入寄存器按统一顺序编号,顺序是从输出寄存器序列从左到右从上到下以“%0”开始,分别记为%0、%1···%9。也就是说,输出的eax是%0,输入的exchange_value、compare_value、dest、mp分别是%1、%2、%3、%4。
因此,cmpxchgl %1,(%3)实际上表示cmpxchgl exchange_value,(dest),此处(dest)表示dest地址所存的值。需要注意的是cmpxchgl有个隐含操作数eax,其实际过程是先比较eax的值(也就是compare_value)和dest地址所存的值是否相等,如果相等则把exchange_value的值写入dest指向的地址。如果不相等则把dest地址所存的值存入eax中。
输出是"=a" (exchange_value),表示把eax中存的值写入exchange_value变量中。

3.2.2、LOCK_IF_MP

根据当前系统是否为多核处理器决定是否为cmpxchg指令添加lock前缀。
如果是多核处理器,为cmpxchg指令添加lock前缀。
反之,就省略lock前缀。(单核处理器会不需要lock前缀提供的内存屏障效果)
intel的手册对lock前缀的说明如下:

1、确保对内存的读-改-写操作原子执行。在Pentium及Pentium之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会带来昂贵的开销。从Pentium4,Intel Xeon及P6处理器开始,intel在原有总线锁的基础上做了一个很有意义的优化:如果要访问的内存区域(area of memory)在lock前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),并且该内存区域被完全包含在单个缓存行(cache line)中,那么处理器将直接执行该指令。由于在指令执行期间该缓存行会一直被锁定,其它处理器无法读/写该指令要访问的内存区域,因此能保证指令执行的原子性。这个操作过程叫做缓存锁定(cache locking),缓存锁定将大大降低lock前缀指令的执行开销,但是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。
2、禁止该指令与之前和之后的读和写指令重排序。
3、把写缓冲区中的所有数据刷新到内存中。

4、CAS的缺点

CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。

ABA问题 

因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。

从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

循环时间长开销大 

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

只能保证一个共享变量的原子操作 

当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。

5、最后奉上Unsafe实现CAS模版

// 类UnsafeInstance: public class UnsafeInstance {     public static Unsafe reflectGetUnsafe() {         try {             Field field = Unsafe.class.getDeclaredField("theUnsafe");             field.setAccessible(true);             return (Unsafe) field.get(null);         } catch (Exception e) {             e.printStackTrace();         }         return null;     }     public static void main(String[] args) {         int j=1;         reflectGetUnsafe().loadFence();         int i= 0;     } } // 类Juc_Cas: @Slf4j public class Juc_Cas {     /**      * 当前加锁状态,记录加锁的次数      */     public volatile int state = 0;     private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);     private static Juc_Cas cas = new Juc_Cas();      public static void main(String[] args) {         new Thread(new Worker(), "t-0").start();         new Thread(new Worker(), "t-1").start();         new Thread(new Worker(), "t-2").start();         new Thread(new Worker(), "t-3").start();         new Thread(new Worker(), "t-4").start();     }      static class Worker implements Runnable {         @Override         public void run() {             log.info("请求:{}到达预定点,准备开始抢state:)", Thread.currentThread().getName());             try {                 cyclicBarrier.await();                 if (cas.compareAndSwapState(0, 1)) {                     log.info("当前请求:{},抢到锁!", Thread.currentThread().getName());                 } else {                     log.info("当前请求:{},抢锁失败!", Thread.currentThread().getName());                 }             } catch (InterruptedException | BrokenBarrierException e) {                 e.printStackTrace();             }         }     }      /**      * 原子操作      *      * @param oldValue        oldvalue:线程工作内存当中的值      * @param newValue:要替换的新值      * @return      */     public final boolean compareAndSwapState(int oldValue, int newValue) {         return unsafe.compareAndSwapInt(this, stateOffset, oldValue, newValue);     }      private static final Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();     private static final long stateOffset;      static {         try {             stateOffset = unsafe.objectFieldOffset(Juc_Cas.class.getDeclaredField("state"));         } catch (Exception e) {             throw new Error();         }     } } 

版权声明:玥玥 发表于 2021-06-05 0:01:58。
转载请注明:CAS深度解析(JAVA) | 女黑客导航