2022-09-25并发编程系列00
请注意,本文编写于 747 天前,最后修改于 747 天前,其中某些信息可能已经过时。

ReentrantLock源码解析

ReentrantLock

参考文档

ReentrantLock介绍

ReentrantLock,意思是“可重入锁”,关于可重入锁的概念在下一节讲述。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。

基本语法上,ReentrantLock与synchronized很相似,它们都具备一样的线程重入特性,只是代码写法上有点区别而已。一个表现为API层面的互斥锁(Lock),一个表现为原生语法层面的互斥锁(synchronized)。

ReentrantLock相对synchronized而言还是增加了一些高级功能,主要有以下三项:

  • 等待可中断:当持有锁的线程长期不释放锁时,正在等待的线程可以选择放弃等待,改为处理其他事情,它对处理执行时间非常长的同步块很有帮助。而在等待由synchronized产生的互斥锁时,会一直阻塞,是不能被中断的。
  • 可实现公平锁:多个线程赶在等待同一个锁时,必须按照申请锁的时间顺序排队等待,而非公平锁则不保证这点,在锁释放时,任何一个等待锁的线程都有机会获得锁。synchronized中的锁是非公平锁,ReentrantLock默认情况下也是非公平锁,但可以通过构造方法ReentrantLock(true)来要求使用公平锁。
  • 锁可以绑定多个条件:ReentrantLock对象可以同时绑定多个Condition对象(条件变量或条件队列),而在synchronized中,锁对象的 wait()和notify()或notifyAll()方法可以实现一个隐含条件,但如果要和多于一个的条件关联的时候,就不得不额外地添加一个锁,而ReentrantLock则无需这么做,只需要多次调用newCondition()方法即可。而且我们还可以通过绑定Condition对象来判断当前线程通知的是哪些线程(即与Condition对象绑定在一起的其它线程)。

ReentrantLock加锁成功流程

非公平锁

reentrantlock 主要有两个构造器 :

 	public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

分别由非公平锁NofairSync(默认) 和 公平锁 FairSync 来实现的

非公平锁 :

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

非公平锁的代码如上面所示, NonfairSync 是 ReentrantLock的内部静态类, 并且添加了 final 关键字, 防止其他类去继承 NofairSync

**如果一个类要被声明为static的,只有一种情况,就是静态内部类。**如果在外部类声明为static,程序会编译都不会过 :

1.静态内部类跟静态方法一样,只能访问静态的成员变量和方法,不能访问非静态的方法和属性,但是普通内部类可以访问任意外部类的成员变量和方法

2.静态内部类可以声明普通成员变量和方法,而普通内部类不能声明static成员变量和方法。

3.静态内部类可以单独初始化

image-20220916094334082

NofairSync的继承关系如上图所示, 可以看到 NofairSync 继承了 Sync , Sync 继承了AQS, 所以可以使用AQS的提供的加锁/解锁方法

加锁流程

初始状态是没有竞争的时候

image-20220917113437493

如图所示 :

  • exclusizeOwnerThread = currentThread , state =0 => state = 1
 	protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

exclusiveOwnerThreadReentrantLock的一个字段, 对象引用指向获取锁的线程


当出现竞争的时候

image-20220917113557632

Thread-1 :

  1. 新的线程过来尝试修改 state 的值C
  2. AS尝试将state由0改为1, 失败 !
  3. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  4. 获取锁失败 , 接下来进入 addWaiter 逻辑,构造 Node 队列

addWaiter 逻辑

image-20220917114319124

addwaiter的逻辑是创建一个Node类对象, 然后插入到队列中, 队列使用双向链表实现

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
		...
}

当前线程进入 acquireQueued 逻辑 :

源码如下 :

	// ReentrantLock -> lock()
	final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

	// AbstractQueuedSynchronizer ->  acquire
	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
	// AbstractQueuedSynchronizer ->  acquireQueued
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
image-20220917115732233
  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
  3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)阻塞
image-20220917115537382

解锁流程

多次竞争失败 :

image-20220917115858932

开始进入解锁流程 :

找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

如果加锁成功(没有竞争),会设置 :

  1. exclusiveOwnerThread 为 Thread-1,state = 1
  2. head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread, 相当于把原本的Thread-1所在的Node设置为虚拟头结点
image-20220917115943506

非公平锁的体现

  • 哪个线程能获取锁, 并不是根据队列顺序, 如果是公平锁的情况下, 新来的 Thread-4 线程需要往后面拍, 让Thread-1 先获取锁
  • 非公平锁的话, 队列中排第二的Node中的线程(第一是虚拟头节点)要和新来的线程竞争
image-20220917120327591

ReentrantLock加锁源码

NonfairSync


// Sync 继承自 AQS
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        // 加锁实现
        final void lock() {
            // 尝试获取锁
            if (compareAndSetState(0, 1))
                // 获取锁成功. 设置当前线程为锁拥有的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //  如果尝试失败,进入 acquire [1]
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

AQS

**[1]acquire **:

	
	// [1]AQS 继承过来的方法 
	public final void acquire(int arg) {
        // [2]tryAcquire 再次尝试获取锁
        if (!tryAcquire(arg) &&
            // 如果 tryAcquire失败, 就执行 addWaiter[4] , 再执行 acquireQueued[5]
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

	
  1. 先调用 tryAcquire() , 如果失败
  2. 执行 addWaiter , 然后执行 acquireQueued, 如果成功
  3. 调用 selfInterrupt(); 阻塞

[2]tryAcquire

选中方法, ctrl + alt + b 即可查看 这个方法的所有实现的类 NonfairSync -> tryAcquire

		// NonfairSync -> 	tryAcquire [2] -> [3]
		protected final boolean tryAcquire(int acquires) {
            //  Sync -> nonfairTryAcquire
            return nonfairTryAcquire(acquires);
        }

[3]nonfairTryAcquire :

AbstractQueuedSynchronizer ==> Sync -> nonfairTryAcquire

// [3] Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
    		// 如果还没有获得锁
            if (c == 0) {
                // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    		// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                // state++ 
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
    		// 获取失败, 回到调用处
            return false;
        }
  1. 获取锁的状态字段 state
  2. 如果还没有获取锁, 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
  3. 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入, state++, 重新设置 state
  4. state 获取失败, 回到调用处

[4]addWaiter :

// [4]AQS 继承过来的方法, 方便阅读, 放在此处
private Node addWaiter(Node mode) {
        // 将当前线程关联到一个 Node 对象上, 模式为独占模式
    	Node node = new Node(Thread.currentThread(), mode);
        // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // 双向链表
                pred.next = node;
                return node;
            }
        }
    	// 尝试将 Node 加入 AQS, 进入 [6]
        enq(node);
        return node;
    }

[6]enq

// AQS 继承过来的方法, 方便阅读, 放在此处
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 判断尾部是否为 null ,如果是 就创建一个虚拟节点, head -> node(null)
            if (t == null) { // Must initialize
                // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // cas 尝试将 Node 对象加入 AQS 队列尾部
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

[5]acquireQueued

//  AQS 继承过来的方法, 方便阅读, 放在此处
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 初始状态 head -> node(null) -> node(thred = thread-1) -> node(thred = thread-2)
                    // head -> node(thred = thread-1) -> node(thred = thread-2) , node(null) 被 GC 回收
                    // head -> node(thred = null) -> node(thred = thread-2) , thread-1 已经获取锁了, 对应的node 被设置为 null
                    // 获取成功, 设置自己(当前线程对应的 node)为 head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 返回中断标记 false
                    return interrupted;
                }
                // 判断是否应当 park, 进入 [7]
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // park 等待, 此时 Node 的状态被置为 Node.SIGNAL [8]
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  1. 将头节点 head 指向 第二个节点 node(thread-1) , 虚拟头结点node(null)失去引用就会被垃圾回收
  2. 原本第二个节点 node(thread-1) 会被作为新的虚拟节点, 并且 node(thread = null) 的 thread 会被重置为 null , 返回中断标记
  3. 判断是否应该进入 park => [7]shouldParkAfterFailedAcquire

[7]shouldParkAfterFailedAcquire

//  AQS 继承过来的方法, 方便阅读, 放在此处   
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// 获取上一个节点的状态    
    	int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
         	// 上一个节点都在阻塞, 那么自己也阻塞好了
            return true;
    	//  > 0 表示取消状态 , 取消状态就是当前线程不再等待锁, 取消等待锁的状态了
        if (ws > 0) {
			// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
          	// 这次还没有阻塞
            // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  1. 获取前一个节点的 waitStatus
  2. 如果前一个节点是 -1 (Node.SIGNAL = 1) , 说明 这个节点正在阻塞, 那么就直接返回 true , 这样就会执行 parkAndCheckInterrupt()
  3. 如果上一个节点的状态是取消状态, 那么就会删除前面所有的取消的节点, 取消的意思是, 对应的线程不再选择在队列里等待锁, 而是取消等待
  4. 如果前一节点还灭有阻塞, 就设置 waitStatus 为 -1 ( Node.SIGNAL)

[8]shouldParkAfterFailedAcquire

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

是否需要 unpark 是由当前节点的前驱节点waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定

ReentrantLock解锁源码

unlock

Sync 继承自 AQS

public class ReentrantLock implements Lock, java.io.Serializable {
    // 解锁实现
    public void unlock() {
        sync.release(1);
    }
    
}

release

// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
   		// 尝试释放锁, 进入 [1]
        if (tryRelease(arg)) {
            //  从队列头节点 开始 unpark
            Node h = head;
            //  队列不为 null 并且  waitStatus == Node.SIGNAL(-1) 才需要 unpark
            if (h != null && h.waitStatus != 0)
                // unpark AQS 中等待的线程, 进入[2]
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  1. 调用 tryRelease 尝试释放锁
  2. 如果释放成功的情况下, 尝试唤醒(unpark)队列中其他阻塞( waitStatus == Node.SIGNAL(-1))的线程 , 最后返回 true
  3. 释放锁失败,

[1]tryRelease

	
//  Sync 继承过来的方法, 方便阅读, 放在此处
	protected final boolean tryRelease(int releases) {
        	// state--
            int c = getState() - releases;
        	// 判断当前线程是否是获取锁的那个线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
        	// c == 0 是为了
        	// 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }	

[2]unparkSuccessor :

 private void unparkSuccessor(Node node) {
		// 如果状态为 Node.SIGNAL 尝试重置状态为 0
     	// 不成功也可以
        int ws = node.waitStatus;
        if (ws < 0) // ws = Node.SIGNAL(-1)
            // 重置状态state
            compareAndSetWaitStatus(node, ws, 0);
		// 找到需要 unpark 的节点(state==-1), 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
        Node s = node.next; // head-> node(null) -> node(thread-1)
     	// waitStatus > 0 表示取消状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 不考虑已取消的节点(), 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                // waitStatus <= 0 阻塞
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  1. 获取当前节点的 waitSate , 如果当前 waitSate < 0, 当前节点对应的 Thread 已经阻塞, 所以要重置当前节点的state为0
  2. 如果 阻塞队列为空 (s == null) , 或者 s 对应的线程等待状态已经被取消了 s.waitStatus > 0
  3. 不考虑已取消的节点(s.waitStatus > 0), 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
  4. 找到需要unpark的节点以后, 执行 LockSupport.unpark(s.thread);

可重入原理

nonfairTryAcquire

static final class NonfairSync extends Sync {
 	
    // Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
        	// 没有其他人获取锁的情况
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
        	// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
}
  1. 没有其他人获取锁的情况 , 修改状态state, 获取锁
  2. 已经获取锁的情况下, 去判断 当前线程是否是获取锁的线程
  3. 然后对 state 进行累加 int nextc = c + acquires;, 表示重入锁的次数 setState(nextc)

tryRelease

	
	//  Sync 继承过来的方法, 方便阅读, 放在此处
	protected final boolean tryRelease(int releases) {
        	// state--
            int c = getState() - releases;
        	// 判断当前线程是否是获取锁的那个线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
        	// c == 0 是为了
        	// 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }	
  1. 让 state - 1 , 然后返回 false , 相当于是把锁重入的计数减一
  2. 当 c == 0, 也就是计数减到0的时候, 说明已经释放成功了, 然后会把 free 设置为 true , 设置 setExclusiveOwnerThread(null); 为 null
  3. 返回释放锁成功的标志 true

流程图解

reentrant源码解析-可重入原理

可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    
    // 从AQS继承来的方法
    private final boolean parkAndCheckInterrupt() {
        //  如果打断标记已经是 true, 则 park 会失效
        LockSupport.park(this);
        // interrupted 会清除打断标记
        return Thread.interrupted();
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 1. 先获取锁 
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 2. 返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
	
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            // 返回的 interrupted = true
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果打断的状态为 true
            selfInterrupt();
    }
    
    static void selfInterrupt() {
        // 重新产生一次中断
        Thread.currentThread().interrupt();
    }
}

可打断模式

static final class NonfairSync extends Sync {
    
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        // 如果没有获得到锁, 进入 [1]
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    
}

[1]可打断的获取锁流程:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 在 park 过程中如果被 interrupt 会进入此
                    parkAndCheckInterrupt())
                    //  这时候抛出异常, 而不会再次进入 for (;;)
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

注意 , 与不可打断模式, 这里是设置 打断标记为true, 会继续进行for循环

	if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

可打断模式 : 会直接抛出异常

if (shouldParkAfterFailedAcquire(p, node) &&
                    // 在 park 过程中如果被 interrupt 会进入此
                    parkAndCheckInterrupt())
                    //  这时候抛出异常, 而不会再次进入 for (;;)
                    throw new InterruptedException();

公平锁原理

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

     	// 与非公平锁主要区别在于 tryAcquire 方法的实现
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
    	
    	// AQS 继承过来的方法, 方便阅读, 放在此处
    	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    	}
    
    	
    	public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // h != t 说明 队列中有 等待的线程
        return h != t &&
            // h.next == null 说明没有 等待的线程
            // h = head -> node(null) -> node(thread1)
            // 判断当前线程是否是第二个节点(第一个为head)对应的线程
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
 }
  1. 判断队列中是否有线程等待 (如果有 = true)
  2. 判断当前线程是是否是排第一个等待的线程, 如果不是, 返回true, 如果是返回false
  3. 当前线程是排队第一等待的线程 hasQueuedPredecessors() = false , !hasQueuedPredecessors() = true
  4. 尝试修改状态state获取锁

公平锁

acquire(1) -> tryAcquire(acquires = 1)

// 与非公平锁主要区别在于 tryAcquire 方法的实现
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  1. 获取当前锁的状态, 如果没有人持有锁
  2. 先检查AQS队列中是否有前驱节点, 也就是说, 判断下当前节点是否是优先级最高的 (排在队列第一的线程优先级肯定是最高的)
  3. 如果自己是优先级最高的情况下, 就尝试获取锁

hasQueuedPredecessors :

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // h != t 说明 队列中有 等待的线程
        return h != t &&
            // h.next == null 说明没有 等待的线程
            // h = head -> node(null) -> node(thread1)
            // 判断当前线程是否是第二个节点(第一个为head)对应的线程
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  1. 判断队列中是否有线程等待 (如果有 = true)
  2. 判断当前线程是是否是排第一个等待的线程, 如果不是, 返回true, 如果是返回false
  3. 当前线程是排队第一等待的线程 hasQueuedPredecessors() = false , !hasQueuedPredecessors() = true
  4. 尝试修改状态state获取锁

非公平锁

acquire(1) -> tryAcquire(acquires = 1)

	protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

	final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  1. 非公平锁不会去考虑AQS等待队列中是否有 线程正在等待的, 无论是新来的, 还是等待的(锁释放的时候就会唤醒这些等待的线程) 一块竞争.

条件变量实现原理

await流程

  1. 让当前持有锁的线程 thread-1 wait , 进入Condition等待队列

  2. 让NonfairSync同步器释放锁 :

    1. owner = null
    2. state = 0
  3. 唤醒 AQS等待队列中优先级最高的节点 , 让其竞争锁

  4. 阻塞 thread-1 (unpark)

await源码

	public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
        	// 进入 fullyRelease 流程
        	// fullRelease 
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 是当前线程处于等待状态
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
addConditionWaiter流程
image-20220918183430436

fullRelease流程

image-20220918203026157

竞争锁流程

image-20220918204301406
  1. fullRelease 之后 唤醒(unpark) AQS 队列中的下一个节点(排名最靠前的)来竞争锁,
  2. 假设没有其他竞争线程,那么 Thread-1 竞争成功
  3. park 阻塞 Thread-0

signal流程

初始状态

假设 Thread-1 要来唤醒 Thread-0

image-20220918210546346

doSignal流程

image-20220918210721637

transferForSignal流程

image-20220918210932397
  1. 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,
  2. 将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
  3. Thread-1 释放锁,进入 unlock 流程

本文作者:王天赐

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!