醋醋百科网

Good Luck To You!

信号量Semaphore:多线程编程中的红绿灯,你真的会用吗?

Semaphore这个看似简单的同步工具,却是多线程编程中最容易被低估的利器。它就像交通信号灯,控制着线程的通行节奏,但远比我们想象的更加灵活多变。今天我们就来彻底搞懂这个并发编程中的"红绿灯"!

Semaphore到底是什么?

Semaphore维护了一组许可证(permits),线程要访问共享资源前必须先获取许可。如果没有可用许可,线程就会被阻塞,直到其他线程释放许可。这种机制完美解决了资源访问控制线程间协调的问题。

关键点在于:Semaphore不像锁那样严格,它允许多个线程同时访问资源,只是数量受控。这种特性让它特别适合连接池、对象池、限流等场景。

Semaphore的实战应用场景

数据库连接池是最经典的Semaphore应用:

public class ConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();
    private final Semaphore available;
    private final int maxSize;

    public ConnectionPool(String url, String user, String pwd, int maxSize) throws SQLException {
        this.maxSize = maxSize;
        this.available = new Semaphore(maxSize, true); // 公平模式
        for (int i = 0; i < maxSize; i++) {
            pool.addLast(DriverManager.getConnection(url, user, pwd));
        }
    }
    public Connection getConnection(long timeout, TimeUnit unit)
            throws InterruptedException, SQLException, TimeoutException {
        if (!available.tryAcquire(timeout, unit)) {
            throw new TimeoutException("获取连接超时");
        }
        synchronized (pool) {
            return pool.removeFirst();
        }
    }
    public void releaseConnection(Connection conn) {
        if (conn != null) {
            synchronized (pool) {
                pool.addLast(conn);
            }
            available.release();
        }
    }

    public void closeAll() throws SQLException {
        synchronized (pool) {
            for (Connection conn : pool) {
                conn.close();
            }
            pool.clear();
        }
    }

    public static void main(String[] args) throws Exception {
        // 测试用例
        ConnectionPool pool = new ConnectionPool(
                "jdbc:mysql://localhost:3306/employees",
                "root",
                "123456",
                5
        );

        ExecutorService executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    Connection conn = pool.getConnection(2, TimeUnit.SECONDS);
                    System.out.println(Thread.currentThread().getName() + " 获取连接成功");
                    Thread.sleep(1000);
                    pool.releaseConnection(conn);
                } catch (Exception e) {
                    System.err.println(Thread.currentThread().getName() + " 错误: " + e.getMessage());
                }
            });
        }
        executor.shutdown();
    }
}

限流场景同样适用Semaphore。比如网关API接口限流

public class GatewayQpsLimiter {
    private final Semaphore semaphore;
    private final ScheduledExecutorService scheduler;
    private final int maxQps;

    public GatewayQpsLimiter(int maxQps) {
        this.maxQps = maxQps;
        this.semaphore = new Semaphore(maxQps, true); // 公平模式
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        // 每秒重置许可数
        scheduler.scheduleAtFixedRate(() -> {
            int available = semaphore.availablePermits();
            if (available < maxQps) {
                semaphore.release(maxQps - available);
            }
        }, 0, 1, TimeUnit.SECONDS);
    }

    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        return semaphore.tryAcquire(timeout, unit);
    }

    public void shutdown() {
        scheduler.shutdown();
    }

    public static void main(String[] args) {
        GatewayQpsLimiter limiter = new GatewayQpsLimiter(100); // 限制100QPS
        // 模拟请求处理
        for (int i = 0; i < 150; i++) {
            new Thread(() -> {
                try {
                    if (limiter.tryAcquire(50, TimeUnit.MILLISECONDS)) {
                        System.out.println("Processing request at " + System.currentTimeMillis());
                    } else {
                        System.out.println("Request rejected due to rate limit");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
        limiter.shutdown();
    }
}

其实Semaphore整体就是对构建Semaphore时,指定的资源数的获取和释放操作

获取资源方式

  • acquire():获取一个资源,没有资源就挂起等待,如果中断,直接抛异常
  • acquire(int):获取指定个数资源,资源不够,或者没有资源就挂起等待,如果中断,直接抛异常
  • tryAcquire():获取一个资源,没有资源返回false,有资源返回true
  • tryAcquire(int):获取指定个数资源,没有资源返回false,有资源返回true
  • tryAcquire(time,unit):获取一个资源,如果没有资源,等待time.unit,如果还没有,就返回false
  • tryAcquire(int,time,unit):获取指定个数资源,如果没有资源,等待time.unit,如果还没有,就返回false
  • acquireUninterruptibly():获取一个资源,没有资源就挂起等待,中断线程不结束,继续等
  • acquireUninterruptibly(int):获取指定个数资源,没有资源就挂起等待,中断线程不结束,继续等

归还资源方式

  • release():归还一个资源
  • release(int):归还指定个数资源

深入Semaphore源码实现

Semaphore的核心是基于AQS(
AbstractQueuedSynchronizer)实现的。
Semaphore提供公平和非公平实现。Semaphore内部维护了一个同步状态state,表示当前可用许可数量。acquire()和release()操作实际上就是修改这个state,且看Semaphore类图及方法调用关系

先看信号量获取资源非公平实现

//信号量非公平实现

protected int tryAcquireShared(int acquires) {

	return nonfairTryAcquireShared(acquires);

}

final int nonfairTryAcquireShared(int acquires) {

	for (;;) {

		// 获取state数值,即剩余的资源数

		int available = getState();

		// 剩余的资源数 - 需要的资源数

		int remaining = available - acquires;

		// 剩余资源数不足,直接返回,线程去走父类排队逻辑

		if (remaining < 0 ||

		//说明资源足够,基于CAS的方式,将state从原值,改为remaining

		compareAndSetState(available, remaining))

		return remaining;

	}

}

再看信号量获取资源公平实现

// 信号量公平实现

protected int tryAcquireShared(int acquires) {

	for (;;) {

		// 如果当前有排队的节点,直接返回,线程去走父类排队逻辑

		if (hasQueuedPredecessors())

		return -1;

		// 获取state数值,即剩余的资源数

		int available = getState();

		// 剩余的资源数 - 需要的资源数

		int remaining = available - acquires;

		// 剩余资源数不足,直接返回,线程去走父类排队逻辑

		if (remaining < 0 ||

		//说明资源足够,基于CAS的方式,将state从原值,改为remaining

		compareAndSetState(available, remaining))

		return remaining;

	}

}

对比信号量公平与非公平实现,不同之处在于如果是公平实现,需要先查看AQS中排队的情况,非公平模式下,新来的线程可能比等待队列中的线程先获取许可,虽然不公平但吞吐量更高。构造Semaphore时可以指定:

// 默认非公平

public Semaphore(int permits) {sync = new NonfairSync(permits);}

public Semaphore(int permits, boolean fair) {

	sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

信号量释放资源实现,不区分公平和非公平

protected final boolean tryReleaseShared(int releases) {

	for (;;) {

		// 拿到当前的state

		int current = getState();

		// 将state + 归还的资源个数,新的state要被设置为next

		int next = current + releases;

		// 避免出现归还资源后,导致next小于之前的资源数,需要做健壮性判断

		if (next < current) // overflow

		throw new Error("Maximum permit count exceeded");

		// CAS操作,保证原子性,只会有一个线程成功的就之前的state修改为next

		if (compareAndSetState(current, next))

		return true;

	}

}


semaphore使用范式

1,基本构造与初始化

  • 创建时需指定许可数量:Semaphore semaphore = new Semaphore(5)表示最多允许5个线程并发访问资源
  • 可选公平模式参数:new Semaphore(5,true)

2,核心操作范式

// 获取许可(阻塞式)
semaphore.acquire();
try {
	// 访问共享资源
} finally {
	// 释放许可
	semaphore.release();
}

3,高级使用模式

  • 批量获取许可:acquire(3)一次性获取3个许可
  • 非阻塞尝试:tryAcquire()立即返回获取结果
  • 超时控制:tryAcquire(1, TimeUnit.SECONDS)
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言