Java多线程之Semaphore信号量
共 5411字,需浏览 11分钟
·
2021-11-30 14:22
这里就JUC包中的Semaphore类做相关介绍
概述
JUC包中的Semaphore信号量作为一个并发工具类。其基本思想很简单,对于一个信号量实例而言,其含有指定数量的许可。每当访问资源前,需先向其申请许可。并在处理完毕后释放许可,以供后续申请。其实,这个使用方式就很像现实世界的停车场,即停车场有空余车位,车才可以进车;否则要么等待要么离开(寻找下一个停车场)。当车从停车场的车位驶离时,则会将相应的车位就会空余出来。在整个过程停车场的车位资源是有限的固定的。常见的使用场景是对业务所使用的线程数进行控制,即所谓基于线程数的限流方式。其常用方法及功能如下所示
// 创建一个指定许可数的非公平信号量
public Semaphore(int permits);
// 创建一个指定许可数的公平/非公平信号量
public Semaphore(int permits, boolean fair);
// 释放一个许可
public void release();
// 释放指定数量的许可
public void release(int permits);
// 当前剩余可用的许可数量
public int availablePermits();
/*************************** 获取许可 ******************************/
// 阻塞等待,直到获取一个许可
public void acquire() throws InterruptedException;
// 阻塞等待,直到获取全部所需数量的许可
public void acquire(int permits) throws InterruptedException;
// 阻塞等待(忽略InterruptedException异常),直到获取一个许可
public void acquireUninterruptibly();
// 阻塞等待(忽略InterruptedException异常),直到获取全部所需数量的许可
public void acquireUninterruptibly(int permits);
// 非阻塞式获取一个许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire();
// 非阻塞式获取全部所需数量的许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(int permits);
// 支持超时机制的tryAcquire方法, 获取一个许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;
// 支持超时机制的tryAcquire方法, 获取全部所需数量的许可, ture: 获取成功; false: 获取失败
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException;
// 一次性获取所有剩余可用的许可, 返回成功获取的许可数
public int drainPermits();
/******************************************************************/
可以看到,对于信号量而言,其支持公平和非公平两种类型。默认为非公平的。值得一提的是,对于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的许可,会立即分配给它。不论是否存在其他正在等待许可的线程。即使当前这个信号量实例是公平的,换言之tryAcquire()方法会破坏公平信号量实例的公平性。如果既期望使用非阻塞方式,又期望不破坏公平信号量的公平性,可以使用它的超时机制版本,同时将超时时间设为0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此处不再赘述
基本实践
这里通过一个简单的实例,来进行展示其基本的使用流程
public class SemaphoreTest {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
// 系统最大的并发处理量
private static Integer maxLimit = 5;
@Test
public void test1() {
System.out.println("---------------------- 系统上线 ----------------------");
Semaphore semaphore = new Semaphore(maxLimit, true);
ExecutorService threadPool = Executors.newFixedThreadPool(10);
IntStream.rangeClosed(1,8)
.mapToObj( num -> new UserReq("用户#"+num, semaphore) )
.forEach( threadPool::execute );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String log = "["+time+"] "+ msg;
System.out.println(log);
}
@AllArgsConstructor
private static class UserReq implements Runnable{
private String name;
private Semaphore semaphore;
@Override
public void run() {
// 模拟用户不定时发起请求
try{ Thread.sleep(RandomUtils.nextLong(500, 2000)); } catch (Exception e) {}
String msg = name + ": 发起请求, 系统可用资源数: " + semaphore.availablePermits();
info(msg);
// 阻塞等待,直到获取许可
try {
semaphore.acquire();
}catch (InterruptedException e) {
System.out.println( "Happen Exception: " + e.getMessage());
}
info(name + ": 系统开始处理请求");
// 模拟业务耗时
try{ Thread.sleep(RandomUtils.nextInt(5, 20)*1000); } catch (Exception e) {}
// 用户请求处理完毕,释放许可
semaphore.release();
info(name + ": 系统处理完毕");
}
}
}
测试结果如下,符合预期
实现原理
构造器
Semaphore信号量类的实现过程同样依赖于AQS。具体地,其是对AQS中共享锁的使用。在构建Semaphore实例过程时,一方面,通过sync变量持有AQS的实现类Sync,同时按公平性与否进一步地可细分为NonfairSync、FairSync;另一方面,通过AQS的state字段来存储许可的数量
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
}
}
acquire方法
首先来看Semaphore的acquire()方法。其委托sync调用AQS的acquireSharedInterruptibly方法。而在AQS中通过调用tryAcquireShared方法判断是否需要阻塞调用线程。具体地,在Semaphore的NonfairSync、FairSync内部类分别实现了该tryAcquireShared方法的两个版本:非公平、公平。可以看到两种实现基本一致。tryAcquireShared如果返回负值,则说明当前许可数不够,当前线程需要进入AQS阻塞队列;反之则获取成功。只是在公平版本的实现中,会调用AQS的hasQueuedPredecessors方法来判断是否有其他线程已经在AQS队列中进行排队。如果有,则tryAcquireShared直接返回-1,即当前调用线程放弃获取,转而准备进入AQS队列以保障公平性
public class Semaphore implements java.io.Serializable {
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平信号量获取许可
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
// 公平信号量获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
// 对于公平性实现而言, 如果AQS队列存在排队的节点
// 则直接返回-1, 即进入AQS队列进行排队以保证公平性
return -1;
// 通过访问AQS的state字段, 获取当前可用的许可数量
int available = getState();
// 计算剩余可用的许可数量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 线程被中断则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 需要子类去实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
}
release方法
Semaphore的release()方法类似。其同样是委托sync调用AQS的releaseShared方法。然后AQS执行tryReleaseShared方法,如果该方法返回true,则会进一步调用AQS的doReleaseShared方法来唤醒AQS队列中其他线程。可以看到在Semaphore的Sync内部类中,tryReleaseShared总是会返回true。其实现过程也很简单,如下所示
public class Semaphore implements java.io.Serializable {
public void release() {
sync.releaseShared(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 通过访问AQS的state字段, 获取当前可用的许可数量
int current = getState();
// 将释放的许可数添加到当前可用许可数量上
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通过CAS的方式更新state字段
if (compareAndSetState(current, next))
return true;
}
}
}
}
...
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 需要子类去实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
}
参考文献
Java并发编程之美 翟陆续、薛宾田著