Semaphore这个看似简单的同步工具,却是多线程编程中最容易被低估的利器。它就像交通信号灯,控制着线程的通行节奏,但远比我们想象的更加灵活多变。今天我们就来彻底搞懂这个并发编程中的"红绿灯"!
Semaphore到底是什么?
Semaphore维护了一组许可证(permits),线程要访问共享资源前必须先获取许可。如果没有可用许可,线程就会被阻塞,直到其他线程释放许可。这种机制完美解决了资源访问控制和线程间协调的问题。
关键点在于:Semaphore不像锁那样严格,它允许多个线程同时访问资源,只是数量受控。这种特性让它特别适合连接池、对象池、限流等场景。
Semaphore的实战应用场景
数据库连接池是最经典的Semaphore应用:
public class ConnectionPool {
private final LinkedList<Connection> pool = new LinkedList<>();
private final Semaphore available;
private final int maxSize;
public ConnectionPool(String url, String user, String pwd, int maxSize) throws SQLException {
this.maxSize = maxSize;
this.available = new Semaphore(maxSize, true); // 公平模式
for (int i = 0; i < maxSize; i++) {
pool.addLast(DriverManager.getConnection(url, user, pwd));
}
}
public Connection getConnection(long timeout, TimeUnit unit)
throws InterruptedException, SQLException, TimeoutException {
if (!available.tryAcquire(timeout, unit)) {
throw new TimeoutException("获取连接超时");
}
synchronized (pool) {
return pool.removeFirst();
}
}
public void releaseConnection(Connection conn) {
if (conn != null) {
synchronized (pool) {
pool.addLast(conn);
}
available.release();
}
}
public void closeAll() throws SQLException {
synchronized (pool) {
for (Connection conn : pool) {
conn.close();
}
pool.clear();
}
}
public static void main(String[] args) throws Exception {
// 测试用例
ConnectionPool pool = new ConnectionPool(
"jdbc:mysql://localhost:3306/employees",
"root",
"123456",
5
);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Connection conn = pool.getConnection(2, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " 获取连接成功");
Thread.sleep(1000);
pool.releaseConnection(conn);
} catch (Exception e) {
System.err.println(Thread.currentThread().getName() + " 错误: " + e.getMessage());
}
});
}
executor.shutdown();
}
}
限流场景同样适用Semaphore。比如网关API接口限流
public class GatewayQpsLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
private final int maxQps;
public GatewayQpsLimiter(int maxQps) {
this.maxQps = maxQps;
this.semaphore = new Semaphore(maxQps, true); // 公平模式
this.scheduler = Executors.newSingleThreadScheduledExecutor();
// 每秒重置许可数
scheduler.scheduleAtFixedRate(() -> {
int available = semaphore.availablePermits();
if (available < maxQps) {
semaphore.release(maxQps - available);
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return semaphore.tryAcquire(timeout, unit);
}
public void shutdown() {
scheduler.shutdown();
}
public static void main(String[] args) {
GatewayQpsLimiter limiter = new GatewayQpsLimiter(100); // 限制100QPS
// 模拟请求处理
for (int i = 0; i < 150; i++) {
new Thread(() -> {
try {
if (limiter.tryAcquire(50, TimeUnit.MILLISECONDS)) {
System.out.println("Processing request at " + System.currentTimeMillis());
} else {
System.out.println("Request rejected due to rate limit");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
limiter.shutdown();
}
}
其实Semaphore整体就是对构建Semaphore时,指定的资源数的获取和释放操作
获取资源方式:
- acquire():获取一个资源,没有资源就挂起等待,如果中断,直接抛异常
- acquire(int):获取指定个数资源,资源不够,或者没有资源就挂起等待,如果中断,直接抛异常
- tryAcquire():获取一个资源,没有资源返回false,有资源返回true
- tryAcquire(int):获取指定个数资源,没有资源返回false,有资源返回true
- tryAcquire(time,unit):获取一个资源,如果没有资源,等待time.unit,如果还没有,就返回false
- tryAcquire(int,time,unit):获取指定个数资源,如果没有资源,等待time.unit,如果还没有,就返回false
- acquireUninterruptibly():获取一个资源,没有资源就挂起等待,中断线程不结束,继续等
- acquireUninterruptibly(int):获取指定个数资源,没有资源就挂起等待,中断线程不结束,继续等
归还资源方式:
- release():归还一个资源
- release(int):归还指定个数资源
深入Semaphore源码实现
Semaphore的核心是基于AQS(
AbstractQueuedSynchronizer)实现的。Semaphore提供公平和非公平实现。Semaphore内部维护了一个同步状态state,表示当前可用许可数量。acquire()和release()操作实际上就是修改这个state,且看Semaphore类图及方法调用关系
先看信号量获取资源非公平实现
//信号量非公平实现
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取state数值,即剩余的资源数
int available = getState();
// 剩余的资源数 - 需要的资源数
int remaining = available - acquires;
// 剩余资源数不足,直接返回,线程去走父类排队逻辑
if (remaining < 0 ||
//说明资源足够,基于CAS的方式,将state从原值,改为remaining
compareAndSetState(available, remaining))
return remaining;
}
}
再看信号量获取资源公平实现
// 信号量公平实现
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果当前有排队的节点,直接返回,线程去走父类排队逻辑
if (hasQueuedPredecessors())
return -1;
// 获取state数值,即剩余的资源数
int available = getState();
// 剩余的资源数 - 需要的资源数
int remaining = available - acquires;
// 剩余资源数不足,直接返回,线程去走父类排队逻辑
if (remaining < 0 ||
//说明资源足够,基于CAS的方式,将state从原值,改为remaining
compareAndSetState(available, remaining))
return remaining;
}
}
对比信号量公平与非公平实现,不同之处在于如果是公平实现,需要先查看AQS中排队的情况,非公平模式下,新来的线程可能比等待队列中的线程先获取许可,虽然不公平但吞吐量更高。构造Semaphore时可以指定:
// 默认非公平
public Semaphore(int permits) {sync = new NonfairSync(permits);}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
信号量释放资源实现,不区分公平和非公平
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 拿到当前的state
int current = getState();
// 将state + 归还的资源个数,新的state要被设置为next
int next = current + releases;
// 避免出现归还资源后,导致next小于之前的资源数,需要做健壮性判断
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS操作,保证原子性,只会有一个线程成功的就之前的state修改为next
if (compareAndSetState(current, next))
return true;
}
}
semaphore使用范式
1,基本构造与初始化
- 创建时需指定许可数量:Semaphore semaphore = new Semaphore(5)表示最多允许5个线程并发访问资源
- 可选公平模式参数:new Semaphore(5,true)
2,核心操作范式
// 获取许可(阻塞式)
semaphore.acquire();
try {
// 访问共享资源
} finally {
// 释放许可
semaphore.release();
}
3,高级使用模式
- 批量获取许可:acquire(3)一次性获取3个许可
- 非阻塞尝试:tryAcquire()立即返回获取结果
- 超时控制:tryAcquire(1, TimeUnit.SECONDS)