2022-09-25并发编程系列00
请注意,本文编写于 550 天前,最后修改于 550 天前,其中某些信息可能已经过时。

b.jpg

Semaphore

Semaphore使用

semaphore [ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限

package cn.knightzz.semaphore.test;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;

import static java.lang.Thread.sleep;

/**
 * @author 王天赐
 * @title: SemaphoreTest
 * @projectName hm-juc-codes
 * @description:
 * @website <a href="http://knightzz.cn/">http://knightzz.cn/</a>
 * @github <a href="https://github.com/knightzz1998">https://github.com/knightzz1998</a>
 * @create: 2022-09-23 14:55
 */
@SuppressWarnings("all")
@Slf4j(topic = "c.SemaphoreTest")
public class SemaphoreTest {

    public static void main(String[] args) {

        // 限制最大线程数 3
        Semaphore semaphore = new Semaphore(3);

        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {

                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                try {
                    log.debug("run ... ");
                    sleep(1000);
                    log.debug("end ... ");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    semaphore.release();
                }

            }, "t" + i).start();
        }
    }
}

执行结果如下 :

15:07:34.419 [t1] DEBUG c.SemaphoreTest - run ... 
15:07:34.419 [t2] DEBUG c.SemaphoreTest - run ... 
15:07:34.419 [t3] DEBUG c.SemaphoreTest - run ... 
15:07:35.425 [t1] DEBUG c.SemaphoreTest - end ... 
15:07:35.425 [t3] DEBUG c.SemaphoreTest - end ... 
15:07:35.425 [t2] DEBUG c.SemaphoreTest - end ... 
15:07:35.425 [t5] DEBUG c.SemaphoreTest - run ... 
15:07:35.425 [t6] DEBUG c.SemaphoreTest - run ... 
15:07:35.425 [t4] DEBUG c.SemaphoreTest - run ... 
15:07:36.438 [t5] DEBUG c.SemaphoreTest - end ... 
15:07:36.438 [t6] DEBUG c.SemaphoreTest - end ... 
15:07:36.438 [t4] DEBUG c.SemaphoreTest - end ... 
15:07:36.438 [t7] DEBUG c.SemaphoreTest - run ... 
15:07:36.438 [t8] DEBUG c.SemaphoreTest - run ... 
15:07:36.438 [t9] DEBUG c.SemaphoreTest - run ... 
15:07:37.453 [t7] DEBUG c.SemaphoreTest - end ... 
15:07:37.453 [t8] DEBUG c.SemaphoreTest - end ... 
15:07:37.453 [t9] DEBUG c.SemaphoreTest - end ... 
15:07:37.453 [t10] DEBUG c.SemaphoreTest - run ... 
15:07:38.465 [t10] DEBUG c.SemaphoreTest - end ... 

Process finished with exit code 0

可以看到上面的执行结果 :

  • 每次只能有3个线程执行, 只有等到这三个获取凭证的线程执行结束后, 其他线程才能获取凭证

Semaphore应用

改进数据库连接池

Semaphore 可以限制在一定时间内的线程数, 可以做简单的限流 , 它只能考虑单机版的, 因为并没有做分布式的考虑

使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,

当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的

改进思路

原本的思路是 :

  • 当线程池中没有连接对象的时候 , 就让当前线程等待

改进思路是 :

  • 使用 Semaphore 限制连接的数量

	 public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i + 1));
        }
    }


// 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if (states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }

    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }

可以看到上面的代码所示 :

  • 构造函数初始化时, 让线程池大小和semaphore凭证数量一致
  • 获取连接时, 先获取凭证, 因为凭证的数量和线程池一致, 这样的话, 当线程池连接对象不足时, 当前线程就会被semaphore阻塞
  • 而释放连接时, 因为semaphore凭证被释放了, 所以, 之前等待的就会获取凭证然后进一步去得到空连接

Semaphore原理

加锁解锁原理

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后

停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源

image-20220923180545430

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

image-20220923181330451

这时 Thread-4 释放了 permits,状态如下

image-20220923181556428

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

image-20220923182100161

源码分析

static final class NonfairSync extends Sync {
 	private static final long serialVersionUID = -2694183684443567898L;
    
    // Semaphore 方法, 方便阅读, 放在此处
 	public void acquire() throws InterruptedException {
 		sync.acquireSharedInterruptibly(1);
 	}
    
    // AQS 继承过来的方法, 方便阅读, 放在此处
	public final void acquireSharedInterruptibly(int arg)
 		throws InterruptedException {
		 if (Thread.interrupted())
 			throw new InterruptedException();
 		if (tryAcquireShared(arg) < 0)
			doAcquireSharedInterruptibly(arg);
 	}
    
    // 尝试获得共享锁
 	protected int tryAcquireShared(int acquires) {
		 return nonfairTryAcquireShared(acquires);
	}
    // Sync 继承过来的方法, 方便阅读, 放在此处
    final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                
                // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
                if (remaining < 0 ||
                    // 如果 cas 重试成功, 返回正数, 表示获取成功
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
    // AQS 继承过来的方法, 方便阅读, 放在此处
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 再次尝试获取许可
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 成功后本线程出队(AQS), 所在 Node设置为 head
 						// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
						// 如果 head.waitStatus == 0 ==> Node.PROPAGATE 
 						// r 表示可用资源数, 为 0 则不会继续传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // Semaphore 方法, 方便阅读, 放在此处
	 public void release() {
		 sync.releaseShared(1);
	 }
    
    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    	
    // Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
}

本文作者:王天赐

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!