ReentrantLock,意思是“可重入锁”,关于可重入锁的概念在下一节讲述。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。
基本语法上,ReentrantLock与synchronized很相似,它们都具备一样的线程重入特性,只是代码写法上有点区别而已。一个表现为API层面的互斥锁(Lock),一个表现为原生语法层面的互斥锁(synchronized)。
ReentrantLock相对synchronized而言还是增加了一些高级功能,主要有以下三项:
ReentrantLock
对象可以同时绑定多个Condition
对象(条件变量或条件队列),而在synchronized中,锁对象的 wait()和notify()或notifyAll()方法可以实现一个隐含条件,但如果要和多于一个的条件关联的时候,就不得不额外地添加一个锁,而ReentrantLock则无需这么做,只需要多次调用newCondition()方法即可。而且我们还可以通过绑定Condition对象来判断当前线程通知的是哪些线程(即与Condition对象绑定在一起的其它线程)。reentrantlock 主要有两个构造器 :
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
分别由非公平锁NofairSync
(默认) 和 公平锁 FairSync
来实现的
非公平锁 :
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
非公平锁的代码如上面所示, NonfairSync 是 ReentrantLock的内部静态类, 并且添加了 final 关键字, 防止其他类去继承 NofairSync
**如果一个类要被声明为static的,只有一种情况,就是静态内部类。**如果在外部类声明为static,程序会编译都不会过 :
1.静态内部类跟静态方法一样,只能访问静态的成员变量和方法,不能访问非静态的方法和属性,但是普通内部类可以访问任意外部类的成员变量和方法
2.静态内部类可以声明普通成员变量和方法,而普通内部类不能声明static成员变量和方法。
3.静态内部类可以单独初始化
NofairSync的继承关系如上图所示, 可以看到 NofairSync 继承了 Sync , Sync 继承了AQS, 所以可以使用AQS的提供的加锁/解锁方法
如图所示 :
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
exclusiveOwnerThread
是 ReentrantLock
的一个字段, 对象引用指向获取锁的线程
Thread-1 :
addwaiter的逻辑是创建一个Node类对象, 然后插入到队列中, 队列使用双向链表实现
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); ... }
源码如下 :
// ReentrantLock -> lock() final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // AbstractQueuedSynchronizer -> acquire public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // AbstractQueuedSynchronizer -> acquireQueued final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
如果加锁成功(没有竞争),会设置 :
非公平锁的体现
// Sync 继承自 AQS static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // 加锁实现 final void lock() { // 尝试获取锁 if (compareAndSetState(0, 1)) // 获取锁成功. 设置当前线程为锁拥有的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 如果尝试失败,进入 acquire [1] acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
// [1]AQS 继承过来的方法 public final void acquire(int arg) { // [2]tryAcquire 再次尝试获取锁 if (!tryAcquire(arg) && // 如果 tryAcquire失败, 就执行 addWaiter[4] , 再执行 acquireQueued[5] acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
选中方法, ctrl + alt + b
即可查看 这个方法的所有实现的类 NonfairSync -> tryAcquire
// NonfairSync -> tryAcquire [2] -> [3] protected final boolean tryAcquire(int acquires) { // Sync -> nonfairTryAcquire return nonfairTryAcquire(acquires); }
AbstractQueuedSynchronizer ==> Sync -> nonfairTryAcquire
// [3] Sync 继承过来的方法, 方便阅读, 放在此处 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果还没有获得锁 if (c == 0) { // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 获取失败, 回到调用处 return false; }
// [4]AQS 继承过来的方法, 方便阅读, 放在此处 private Node addWaiter(Node mode) { // 将当前线程关联到一个 Node 对象上, 模式为独占模式 Node node = new Node(Thread.currentThread(), mode); // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 双向链表 pred.next = node; return node; } } // 尝试将 Node 加入 AQS, 进入 [6] enq(node); return node; }
// AQS 继承过来的方法, 方便阅读, 放在此处 private Node enq(final Node node) { for (;;) { Node t = tail; // 判断尾部是否为 null ,如果是 就创建一个虚拟节点, head -> node(null) if (t == null) { // Must initialize // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0) if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // cas 尝试将 Node 对象加入 AQS 队列尾部 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
// AQS 继承过来的方法, 方便阅读, 放在此处 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取锁 if (p == head && tryAcquire(arg)) { // 初始状态 head -> node(null) -> node(thred = thread-1) -> node(thred = thread-2) // head -> node(thred = thread-1) -> node(thred = thread-2) , node(null) 被 GC 回收 // head -> node(thred = null) -> node(thred = thread-2) , thread-1 已经获取锁了, 对应的node 被设置为 null // 获取成功, 设置自己(当前线程对应的 node)为 head setHead(node); p.next = null; // help GC failed = false; // 返回中断标记 false return interrupted; } // 判断是否应当 park, 进入 [7] if (shouldParkAfterFailedAcquire(p, node) && // park 等待, 此时 Node 的状态被置为 Node.SIGNAL [8] parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
[7]shouldParkAfterFailedAcquire
// AQS 继承过来的方法, 方便阅读, 放在此处 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取上一个节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 上一个节点都在阻塞, 那么自己也阻塞好了 return true; // > 0 表示取消状态 , 取消状态就是当前线程不再等待锁, 取消等待锁的状态了 if (ws > 0) { // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 这次还没有阻塞 // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL
来决定,而不是本节点的waitStatus 决定
Sync 继承自 AQS
public class ReentrantLock implements Lock, java.io.Serializable { // 解锁实现 public void unlock() { sync.release(1); } }
// AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean release(int arg) { // 尝试释放锁, 进入 [1] if (tryRelease(arg)) { // 从队列头节点 开始 unpark Node h = head; // 队列不为 null 并且 waitStatus == Node.SIGNAL(-1) 才需要 unpark if (h != null && h.waitStatus != 0) // unpark AQS 中等待的线程, 进入[2] unparkSuccessor(h); return true; } return false; }
waitStatus == Node.SIGNAL(-1)
)的线程 , 最后返回 true// Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; // 判断当前线程是否是获取锁的那个线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // c == 0 是为了 // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { // 如果状态为 Node.SIGNAL 尝试重置状态为 0 // 不成功也可以 int ws = node.waitStatus; if (ws < 0) // ws = Node.SIGNAL(-1) // 重置状态state compareAndSetWaitStatus(node, ws, 0); // 找到需要 unpark 的节点(state==-1), 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的 Node s = node.next; // head-> node(null) -> node(thread-1) // waitStatus > 0 表示取消状态 if (s == null || s.waitStatus > 0) { s = null; // 不考虑已取消的节点(), 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点 for (Node t = tail; t != null && t != node; t = t.prev) // waitStatus <= 0 阻塞 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
LockSupport.unpark(s.thread);
static final class NonfairSync extends Sync { // Sync 继承过来的方法, 方便阅读, 放在此处 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 没有其他人获取锁的情况 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
int nextc = c + acquires;
, 表示重入锁的次数 setState(nextc)
// Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; // 判断当前线程是否是获取锁的那个线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // c == 0 是为了 // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
// Sync 继承自 AQS static final class NonfairSync extends Sync { // 从AQS继承来的方法 private final boolean parkAndCheckInterrupt() { // 如果打断标记已经是 true, 则 park 会失效 LockSupport.park(this); // interrupted 会清除打断标记 return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 1. 先获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 2. 返回打断状态 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if (!tryAcquire(arg) && // 返回的 interrupted = true acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果打断的状态为 true selfInterrupt(); } static void selfInterrupt() { // 重新产生一次中断 Thread.currentThread().interrupt(); } }
static final class NonfairSync extends Sync { public final void acquireInterruptibly(int arg) throws InterruptedException { // 如果没有获得到锁, 进入 [1] if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } }
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && // 在 park 过程中如果被 interrupt 会进入此 parkAndCheckInterrupt()) // 这时候抛出异常, 而不会再次进入 for (;;) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
注意 , 与不可打断模式, 这里是设置 打断标记为true, 会继续进行for循环
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
可打断模式 : 会直接抛出异常
if (shouldParkAfterFailedAcquire(p, node) && // 在 park 过程中如果被 interrupt 会进入此 parkAndCheckInterrupt()) // 这时候抛出异常, 而不会再次进入 for (;;) throw new InterruptedException();
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } // 与非公平锁主要区别在于 tryAcquire 方法的实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // h != t 说明 队列中有 等待的线程 return h != t && // h.next == null 说明没有 等待的线程 // h = head -> node(null) -> node(thread1) // 判断当前线程是否是第二个节点(第一个为head)对应的线程 ((s = h.next) == null || s.thread != Thread.currentThread()); } }
hasQueuedPredecessors() = false
, !hasQueuedPredecessors() = true
acquire(1) -> tryAcquire(acquires = 1)
// 与非公平锁主要区别在于 tryAcquire 方法的实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
hasQueuedPredecessors :
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // h != t 说明 队列中有 等待的线程 return h != t && // h.next == null 说明没有 等待的线程 // h = head -> node(null) -> node(thread1) // 判断当前线程是否是第二个节点(第一个为head)对应的线程 ((s = h.next) == null || s.thread != Thread.currentThread()); }
hasQueuedPredecessors() = false
, !hasQueuedPredecessors() = true
acquire(1) -> tryAcquire(acquires = 1)
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
让当前持有锁的线程 thread-1 wait , 进入Condition等待队列
让NonfairSync同步器释放锁 :
唤醒 AQS等待队列中优先级最高的节点 , 让其竞争锁
阻塞 thread-1 (unpark)
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 进入 fullyRelease 流程 // fullRelease int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 是当前线程处于等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
假设 Thread-1 要来唤醒 Thread-0
本文作者:王天赐
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!