semaphore [ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限
package cn.knightzz.semaphore.test; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Semaphore; import static java.lang.Thread.sleep; /** * @author 王天赐 * @title: SemaphoreTest * @projectName hm-juc-codes * @description: * @website <a href="http://knightzz.cn/">http://knightzz.cn/</a> * @github <a href="https://github.com/knightzz1998">https://github.com/knightzz1998</a> * @create: 2022-09-23 14:55 */ @SuppressWarnings("all") @Slf4j(topic = "c.SemaphoreTest") public class SemaphoreTest { public static void main(String[] args) { // 限制最大线程数 3 Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 10; i++) { new Thread(() -> { try { semaphore.acquire(); } catch (InterruptedException e) { throw new RuntimeException(e); } try { log.debug("run ... "); sleep(1000); log.debug("end ... "); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { semaphore.release(); } }, "t" + i).start(); } } }
执行结果如下 :
15:07:34.419 [t1] DEBUG c.SemaphoreTest - run ... 15:07:34.419 [t2] DEBUG c.SemaphoreTest - run ... 15:07:34.419 [t3] DEBUG c.SemaphoreTest - run ... 15:07:35.425 [t1] DEBUG c.SemaphoreTest - end ... 15:07:35.425 [t3] DEBUG c.SemaphoreTest - end ... 15:07:35.425 [t2] DEBUG c.SemaphoreTest - end ... 15:07:35.425 [t5] DEBUG c.SemaphoreTest - run ... 15:07:35.425 [t6] DEBUG c.SemaphoreTest - run ... 15:07:35.425 [t4] DEBUG c.SemaphoreTest - run ... 15:07:36.438 [t5] DEBUG c.SemaphoreTest - end ... 15:07:36.438 [t6] DEBUG c.SemaphoreTest - end ... 15:07:36.438 [t4] DEBUG c.SemaphoreTest - end ... 15:07:36.438 [t7] DEBUG c.SemaphoreTest - run ... 15:07:36.438 [t8] DEBUG c.SemaphoreTest - run ... 15:07:36.438 [t9] DEBUG c.SemaphoreTest - run ... 15:07:37.453 [t7] DEBUG c.SemaphoreTest - end ... 15:07:37.453 [t8] DEBUG c.SemaphoreTest - end ... 15:07:37.453 [t9] DEBUG c.SemaphoreTest - end ... 15:07:37.453 [t10] DEBUG c.SemaphoreTest - run ... 15:07:38.465 [t10] DEBUG c.SemaphoreTest - end ... Process finished with exit code 0
可以看到上面的执行结果 :
Semaphore 可以限制在一定时间内的线程数, 可以做简单的限流 , 它只能考虑单机版的, 因为并没有做分布式的考虑
使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,
当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的
原本的思路是 :
改进思路是 :
public Pool(int poolSize) { this.poolSize = poolSize; // 让许可数与资源数一致 this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i + 1)); } } // 5. 借连接 public Connection borrow() {// t1, t2, t3 // 获取许可 try { semaphore.acquire(); // 没有许可的线程,在此等待 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { // 获取空闲连接 if (states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } // 不会执行到这里 return null; } // 6. 归还连接 public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); semaphore.release(); break; } } }
可以看到上面的代码所示 :
semaphore
凭证数量一致semaphore
阻塞Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后
停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; // Semaphore 方法, 方便阅读, 放在此处 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 尝试获得共享锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // Sync 继承过来的方法, 方便阅读, 放在此处 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly if (remaining < 0 || // 如果 cas 重试成功, 返回正数, 表示获取成功 compareAndSetState(available, remaining)) return remaining; } } // AQS 继承过来的方法, 方便阅读, 放在此处 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取许可 int r = tryAcquireShared(arg); if (r >= 0) { // 成功后本线程出队(AQS), 所在 Node设置为 head // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE // r 表示可用资源数, 为 0 则不会继续传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // Semaphore 方法, 方便阅读, 放在此处 public void release() { sync.releaseShared(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } }
本文作者:王天赐
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!