java并发编程之Lock锁

java并发编程之Lock锁原理

事先声明:本文为原创文章,禁止直接转载和抄袭,若要转载需经过本人同意

概述

java锁机制是一个老生常谈的问题了,那么在接触到锁的时候有没有想过什么情况下需要使用锁?锁的原理又是什么?接下来,笔者就这两个问题展开对锁的解述。

锁的类型

常用的锁分为以下几类:
1、数据库锁
2、java内置锁
3、分布式锁
本篇文章笔者将对java内置锁–Lock锁做深入分析,其他两个锁笔者会陆续做出分享。

使用场景

首先,需要加锁的资源一定是临界资源,所谓临界资源就是在多线程的情况下,各个线程会进行抢占的资源。下面以一段代码来说明:

import org.junit.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;  public class LockTest {      private int count = 500;      @Test     public void unLockTest() throws InterruptedException {         ExecutorService threadPool = Executors.newFixedThreadPool(10);         for (int i = 0; i < 500; i++){             threadPool.submit(() -> {                     count--;             });         }         Thread.sleep(5000);         System.out.println(count);     } }  结果:2 

代码很简单,创建了个核心线程数为10的线程池(这里需要注意的是在真实开发中禁止使用这种方式创建线程池),然后创建了500个任务,把这五百个任务里丢到线程池去处理,每个任务会对count进行减一。理想情况下,count应该为0,但实际count最后为2。其原因就是因为在多线程环境下,各个线程对count发生了资源的争夺,导致了数据的不安全性。其中,count就是临界资源,多线程就是我们常说的并发环境。

下面对代码进行修改:

    private int count = 500;     @Test     public void lockTest() throws InterruptedException {         ExecutorService threadPool = Executors.newFixedThreadPool(10);         Lock lock = new ReentrantLock();         for (int i = 0; i < 500; i++){             threadPool.submit(() -> {                 lock.lock();                 count--;                 lock.unlock();             });         }         Thread.sleep(5000);         System.out.println(count);     }      结果:0 

代码整体没有太大变化,只是在count–前后做了加锁和减锁操作,最后无论代码运行多少次,结果都是0(需要注意的是这里忽略了可见性),没有出现未加锁时的少减情况。这就是锁的使用场景,无论是数据库锁、java内置锁还是分布式锁,他们的使用场景都大同小异。使用锁的目的就是为了控制临界资源的安全性。

Lock锁原理详解

java并发编程之Lock锁
如图,Lock只是一个接口,它有很多实现类,本文着重讲ReentrantLock。

ReetrantLock
在使用Lock前,先要new一个Lock出来,先来分析new ReentrantLock()做了什么,代码如下:

    //初始化Lock     Lock lock = new ReentrantLock();         //部分ReentrantLock()源码     public class ReentrantLock implements Lock, java.io.Serializable {    		private static final long serialVersionUID = 7373984872572414699L;    		private final Sync sync;     	public ReentrantLock() {         sync = new NonfairSync();     	}     } 
                                           源码1-1 

如上源码所示,在初始化Lock时,会给ReentrantLock的成员变量//部分Sync源码 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); }

                                          源码1-2 

Sync是ReentrantLock的抽象内部类并且它实现了AQS抽象类,这里注意这个lock方法,该方法是一个抽象方法。Sync有两个实现类,分别是NonfairSync以及FairSync(其实这两个实现类就是Lock锁的公平锁与非公平锁机制)。看到这里就可以结合上面的源码串起来了,在使用无参的ReentrantLock创建Lock实例时,默认使用的是NonfairSync非公平锁机制。

lock方法
Lock锁加锁的方法是lock方法,下面来分析lock源码:

    //部分ReentrantLock()源码     public class ReentrantLock implements Lock, java.io.Serializable {    		private static final long serialVersionUID = 7373984872572414699L;    		private final Sync sync;     	public ReentrantLock() {         sync = new NonfairSync();     	}     	public void lock() {         	sync.lock();     	}     } 
                                         源码2-1 

如上源码所示,Lock类的lock方法实际调用的是其子类ReentrantLock的lock方法,而ReentrantLock中的lock方法则又调用了Sync的lock方法,而在“源码1-1”的时候我就已经知道sync的默认值为NonfairSync实例,所以这里的lock方法最终会调用到NonfairSync中的lock方法。

NonfairSync源码分析:

static final class NonfairSync extends Sync {         private static final long serialVersionUID = 7316153563782823691L;          /**          * Performs lock.  Try immediate barge, backing up to normal          * acquire on failure.          */         final void lock() {             if (compareAndSetState(0, 1))                 setExclusiveOwnerThread(Thread.currentThread());             else                 acquire(1);         }          protected final boolean tryAcquire(int acquires) {             return nonfairTryAcquire(acquires);         }     } 
                                           源码3-1 

先来看lock方法,首先会调用compareAndSetState(0, 1)方法,该方法是一个CAS操作,其作用就是修改state值为1,修改成功则返回true,修改失败则返回false。若是修改成功,则会执行setExclusiveOwnerThread(Thread.currentThread()),该方法的作用是修改exclusiveOwnerThread为当前线程id,下面先说明status和exclusiveOwnerThread这两个变量是什么,看源码:

//部分AQS源码 public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer     implements java.io.Serializable {      private static final long serialVersionUID = 7373984972572414691L;          protected AbstractQueuedSynchronizer() { }     //CLH队列头结点     private transient volatile Node head;     //CLH队列尾结点     private transient volatile Node tail;     //锁标识state变量     private volatile int state;     } 
                                           源码3-2 
//AQS父类源码 public abstract class AbstractOwnableSynchronizer     implements java.io.Serializable {      private static final long serialVersionUID = 3737899427754241961L;      protected AbstractOwnableSynchronizer() { }          //独占线程id变量     private transient Thread exclusiveOwnerThread;      protected final void setExclusiveOwnerThread(Thread thread) {         exclusiveOwnerThread = thread;     }      protected final Thread getExclusiveOwnerThread() {         return exclusiveOwnerThread;     } } 
                                         源码3-3 

在看完源码3-2和源码3-3后已经可以得知变量state和exclusiveOwnerThread的来源。那么在讲明这两个变量的作用,在Lock锁中,一个线程想要获取锁,那么需要满足一下两个条件之一:
条件一:该线程可以成功的将state值从0改为1
条件二:若state值不为1时,则检测持有锁的线程是否为当前线程,也就是exclusiveOwnerThread的值是否为当前线程id,若是则将state值加1,这就是重入锁。

看到这里对源码3-1的if逻辑也有了认知,那么接下来讲源码3-1的else逻辑,也就是acquire(1)方法,源码如下:

acquire方法

   //AQS部分源码    public final void acquire(int arg) {         if (!tryAcquire(arg) &&             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))             selfInterrupt();     } 
                                          源码4-1 

该方法中主要调用了3个方法,分别是if判断中的tryAcquire以及acquireQueued方法和if体中的selfInterrupt方法。首先分析tryAcquire方法,源码如下:

tryAcquire源码

		//NonFairSync部分源码     	static final class NonfairSync extends Sync {         	private static final long serialVersionUID = 7316153563782823691L;         	//tryAcquire方法         	protected final boolean tryAcquire(int acquires) {             	return nonfairTryAcquire(acquires);         	}         } 		//Sync部分源码         abstract static class Sync extends AbstractQueuedSynchronizer {         //nonfairTryAcquire方法         final boolean nonfairTryAcquire(int acquires) {         	//获取当前线程id             final Thread current = Thread.currentThread();             //获取锁标识             int c = getState();             //若锁标识为0,则当前线程尝试去获取锁             if (c == 0) {                 if (compareAndSetState(0, acquires)) {                     setExclusiveOwnerThread(current);                     //当前线程获取锁成功,返回true                     return true;                 }             }             //如果持有锁线程id和当前线程id相等,则进行重入锁             else if (current == getExclusiveOwnerThread()) {             	//锁标识加1                 int nextc = c + acquires;                 if (nextc < 0) // overflow                     throw new Error("Maximum lock count exceeded");                 //设置锁标识state的值为nextc                 setState(nextc);                 //当前线程获取锁成功,返回true                 return true;             }             //当前线程获取锁失败。返回false             return false;         } 
											源码4-2 

如上源码所示,tryAcquire最终调用Sync中的nonfairTryAcquire方法,该方法主要逻辑是当前线程去获取锁,如果能够获取到则返回true,若不能获取到则返回false。具体细节看注释。接下来继续看acquireQueued方法源码,首先在源码4-1中可以看到该方法的参数是addWaiter(Node.EXCLUSIVE), arg),这里先看addWaiter(Node.EXCLUSIVE), arg)方法源码,如下:

addWaiter源码

//AQS部分源码 public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer     implements java.io.Serializable {      private static final long serialVersionUID = 7373984972572414691L;          protected AbstractQueuedSynchronizer() { }     //CLH队列头结点     private transient volatile Node head;     //CLH队列尾结点     private transient volatile Node tail;     //锁标识state变量     private volatile int state;     //addWaiter方法  	private Node addWaiter(Node mode) {  		//将当前线程包装成一个Node对象         Node node = new Node(Thread.currentThread(), mode);         //获取CLH队列的尾结点         Node pred = tail;         //如果尾结点不为空,则说明整个CLH队列不为空,那么就将队尾结点设置为当前结点         if (pred != null) {             node.prev = pred;             //cas操作将队尾结点设置为当前node             if (compareAndSetTail(pred, node)) {                 pred.next = node;                 return node;             }         }         //若调用到这里,则说明上述cas操作失败或CLH队列为空         enq(node);         return node;     }          //enq方法  	private Node enq(final Node node) {  		//cas自旋操作         for (;;) {         	//获取尾结点             Node t = tail;             //若尾结点为空,说明CLH队列为空,则初始化CLH队列             if (t == null) { // Must initialize                 if (compareAndSetHead(new Node()))                     tail = head;             } else {             	//若尾结点不为空,则cas操作将队尾结点设置为当前node(注意这里使用了自旋,所以这个替换最终是会成功的)                 node.prev = t;                 if (compareAndSetTail(t, node)) {                     t.next = node;                     return t;                 }             }         }     } }  //Node源码 	static final class Node {              static final Node SHARED = new Node();          static final Node EXCLUSIVE = null; 		//生命状态1,表明该结点需要移除         static final int CANCELLED =  1; 		//生命状态-1,等待触发状态         static final int SIGNAL    = -1;     	//生命状态-2,表明该节点在条件队列         static final int CONDITION = -2;         //生命状态-3,表明是共享的,需要向后传播         static final int PROPAGATE = -3;         //生命状态         volatile int waitStatus; 		//前驱结点         volatile Node prev; 		//后继结点         volatile Node next; 		//当前结点对应线程         volatile Thread thread; 		         Node nextWaiter;  }  
                                          源码4-3 

以上就是addWaiter方法源码,其作用就是将没获取到锁的线程包装成一个node结点,然后将CLH队列的尾结点设置为该node结点,这里需注意Node类中的的变量,尤其是waitStatus,具体细节看注释。接下来继续看acquireQueued方法源码,如下:

acquireQueued源码

//AQS部分源码 public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer     implements java.io.Serializable {     //acquireQueued方法 	final boolean acquireQueued(final Node node, int arg) { 		//该变量在lock方法中的调用链中无实际作用         boolean failed = true;         try {         	//设置线程中断信号为false             boolean interrupted = false;             //cas自旋             for (;;) {             	//获取当前node的前一个结点                 final Node p = node.predecessor();                 //若前一个结点是头结点,则去获取锁,若获取锁成功,则将当前node设置为头结点                 //(在setHead中会将当前node的前驱结点以及对应的线程置为null),原来的head指向空(指向空,在gc时就是回收原来的head)                 if (p == head && tryAcquire(arg)) {                     setHead(node);                     p.next = null; // help GC                     failed = false;                     //返回中断信号                     return interrupted;                 }                 //该if判断中主要是设置了当前node的前驱节点的生命状态(waitStatus)以及阻塞当前线程操作                 if (shouldParkAfterFailedAcquire(p, node) &&                     parkAndCheckInterrupt())                     //返回中断信号                     interrupted = true;             }         } finally {         	//将当前node从CLH队列中剔除掉,在lock方法的调用链中无实际作用             if (failed)                 cancelAcquire(node);         }     } } 
                                         源码4-4 

以上就是acquireQueued方法源码,该方法的作用就是线程尝试获取锁,若获取不到则进行线程阻塞操作,其中在if判断中调用了两个方法:shouldParkAfterFailedAcquire和parkAndCheckInterrupt(该方法阻塞线程)。这里需要注意的是提到了结点的生命状态(waitStatus),接下来继续看shouldParkAfterFailedAcquire方法源码,如下:

shouldParkAfterFailedAcquireuy源码

//AQS部分源码 public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer     implements java.io.Serializable {     //shouldParkAfterFailedAcquire方法 	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 		//获取当前结点前驱结点的生命状态         int ws = pred.waitStatus;         //如果前驱节点的生命状态为SIGNAL,则返回true         if (ws == Node.SIGNAL)             /*              * This node has already set status asking a release              * to signal it, so it can safely park.              */             return true;         //如果前驱节点的生命状态大于0,则为取消状态,进行结点移除          if (ws > 0) {             /*              * Predecessor was cancelled. Skip over predecessors and              * indicate retry.              */             do {                 node.prev = pred = pred.prev;             } while (pred.waitStatus > 0);             pred.next = node;         } else {             /*              * waitStatus must be 0 or PROPAGATE.  Indicate that we              * need a signal, but don't park yet.  Caller will need to              * retry to make sure it cannot acquire before parking.              */             //将前驱节点的waitStaus设置为SIGNAL             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);         }         return false;     } } 
                                            源码4-5 

以上就是shouldParkAfterFailedAcquire方法源码,该方法就是将当前节点的前驱结点waiteStatus设置为-1,那么为什么设置为-1呢?在CLH队列中,只有当前结点的waitStatus为-1,那么当前结点对应线程在释放锁时才会唤醒下一个结点。接下来继续看parkAndCheckInterrupt源码:

parkAndCheckInterrupt源码

//AQS部分源码 public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer     implements java.io.Serializable {  	private final boolean parkAndCheckInterrupt() {  		//阻塞线程         LockSupport.park(this);         //返回线程的中断状态         return Thread.interrupted();     } } 

以上就是parkAndCheckInterrupt方法源码,该方法主要方法是LockSupport.park(this),此方法会调用Unsafe类中的park方法继而调用到操作系统的函数,最终阻塞当前线程。

以上就是Lock锁的lock方法原理,接下来继续讲unLock方法。

unLock源码

	//部分ReentrantLock源码 	public class ReentrantLock implements Lock, java.io.Serializable {    		private static final long serialVersionUID = 7373984872572414699L; 		//unLock方法 		public void unlock() {         	sync.release(1);     	}  	public abstract class AbstractQueuedSynchronizer     	extends AbstractOwnableSynchronizer     	implements java.io.Serializable {     	private static final long serialVersionUID = 7373984972572414691L;     	//release方法   		public final boolean release(int arg) {         	if (tryRelease(arg)) {             	Node h = head;             	if (h != null && h.waitStatus != 0)             		//唤醒CLH队列第一个线程                 	unparkSuccessor(h);             	return true;         	}         	return false;     }     //unparkSuccessor方法 	private void unparkSuccessor(Node node) {         /*          * If status is negative (i.e., possibly needing signal) try          * to clear in anticipation of signalling.  It is OK if this          * fails or if status is changed by waiting thread.          */         int ws = node.waitStatus;         if (ws < 0)             compareAndSetWaitStatus(node, ws, 0);          /*          * Thread to unpark is held in successor, which is normally          * just the next node.  But if cancelled or apparently null,          * traverse backwards from tail to find the actual          * non-cancelled successor.          */         Node s = node.next;         if (s == null || s.waitStatus > 0) {             s = null;             for (Node t = tail; t != null && t != node; t = t.prev)                 if (t.waitStatus <= 0)                     s = t;         }         if (s != null)         	//唤醒指定线程id的线程             LockSupport.unpark(s.thread);     } 

以上就是unLock方法的调用链以及源码,其核心方法LockSupport.unpark(s.thread)会继续调用Unsafe类的unpark方法,该方法会调用到操作系统的函数继而去唤醒指定线程。

总结

到这里,Lock锁的地原理和机制已经全部梳理完了。不过还有一些细节,比如中断信号,cas原子操作没有详细讲,但这些不影整体的逻辑梳理。若是对文章中的一些点有疑惑,可在评论区留言或私信我。

版权声明:玥玥 发表于 2021-04-24 3:25:03。
转载请注明:java并发编程之Lock锁 | 女黑客导航