多线程设计模式:保护性暂停模式详解以及其在DUBBO应用源码分析
JAVA前线
互联网技术人思考与分享,欢迎长按关注
1 文章概述
在多线程编程实践中,我们肯定会面临线程间数据交互的问题。在处理这类问题时需要使用一些设计模式,从而保证程序的正确性和健壮性。
保护性暂停设计模式就是解决多线程间数据交互问题的一种模式。本文先从基础案例介绍保护性暂停基本概念和实践,再由浅入深,最终分析DUBBO源码中保护性暂停设计模式使用场景。
2 什么是保护性暂停
我们设想这样一种场景:线程A生产数据,线程B读取数据这个数据。
但是有一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。
在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。
那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。
保护性暂停有多种实现方式,本文我们用synchronized/wait/notify的方式实现。
@Getter
@Setter
public class MyData implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public MyData(String message) {
this.message = message;
}
}
class Resource1 {
private MyData data;
private Object lock = new Object();
public MyData getData() {
synchronized (lock) {
while (data == null) {
try {
// 没有数据则释放锁并暂停等待被唤醒
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
// 生产数据后唤醒消费线程
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例一
*
* @author 微信公众号「JAVA前线」
*/
public class ProtectDesignTest1 {
public static void main(String[] args) {
Resource1 resource = new Resource1();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData();
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
在上述实例中线程1生产数据,线程2消费数据。Resource1类中通过wait/notify实现了保护性暂停设计模式。
3 加一个超时时间
上述实例中如果线程2没有获取到数据,那么线程2直到拿到数据才会退出。现在我们给获取数据指定一个超时时间,如果在这个时间内没有获取到数据则抛出超时异常。虽然只是加一个参数,但是其中有很多细节需要注意。
3.1 一段有问题的代码
我们分析下面这段代码
class Resource2 {
private MyData data;
private Object lock = new Object();
public MyData getData(int timeOut) {
synchronized (lock) {
while (data == null) {
try {
// 代码1
lock.wait(timeOut);
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例二
*
* @author 微信公众号「JAVA前线」
*/
public class ProtectDesignTest2 {
public static void main(String[] args) {
Resource2 resource = new Resource2();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData(1000);
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
这段代码看似没有问题,使用的也是wait带有超时时间的参数,那么问题可能出在哪里呢?
问题是线程虚假唤醒带来的。如果还没有到超时时间代码1就被虚假唤醒,此时data还没有值就会直接跳出循环,这样没有达到我们预期的超时时间才跳出循环的预期。
关于虚假唤醒这个概念,我们看看JDK官方文档相关介绍。
A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one:
synchronized (obj) {
while ()
obj.wait(timeout);
}
官方文档告诉我们一个线程可能会在没有被notify时虚假唤醒,所以判断是否继续wait时必须用while循环。我们在写代码时一定也要注意线程虚假唤醒问题。
3.2 正确实例
上面我们明白了虚假唤醒问题,现在我们对代码进行修改,说明参看代码注释。
class Resource3 {
private MyData data;
private Object lock = new Object();
public MyData getData(int timeOut) {
synchronized (lock) {
// 运行时长
long timePassed = 0;
// 开始时间
long begin = System.currentTimeMillis();
// 如果结果为空
while (data == null) {
try {
// 如果运行时长大于超时时间退出循环
if (timePassed > timeOut) {
break;
}
// 如果运行时长小于超时时间表示虚假唤醒 -> 只需再等待时间差值
long waitTime = timeOut - timePassed;
// 等待时间差值
lock.wait(waitTime);
// 结果不为空直接返回
if (data != null) {
break;
}
// 被唤醒后计算运行时长
timePassed = System.currentTimeMillis() - begin;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例三
*
* @author 微信公众号「JAVA前线」
*/
public class ProtectDesignTest3 {
public static void main(String[] args) {
Resource3 resource = new Resource3();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData(1000);
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
4 加一个编号
现在再来设想一个场景:现在有三个生产数据的线程1、2、3,三个获取数据的线程4、5、6,我们希望每个获取数据线程都只拿到其中一个生产线程的数据,不能多拿也不能少拿。
这里引入一个Futures模型,这个模型为每个资源进行编号并存储在容器中,例如线程1生产的数据被拿走则从容器中删除,一直到容器为空结束。
@Getter
@Setter
public class MyNewData implements Serializable {
private static final long serialVersionUID = 1L;
private static final AtomicLong ID = new AtomicLong(0);
private Long id;
private String message;
public MyNewData(String message) {
this.id = newId();
this.message = message;
}
/**
* 自增到最大值会回到最小值(负值可以作为识别ID)
*/
private static long newId() {
return ID.getAndIncrement();
}
public Long getId() {
return this.id;
}
}
class MyResource {
private MyNewData data;
private Object lock = new Object();
public MyNewData getData(int timeOut) {
synchronized (lock) {
long timePassed = 0;
long begin = System.currentTimeMillis();
while (data == null) {
try {
if (timePassed > timeOut) {
break;
}
long waitTime = timeOut - timePassed;
lock.wait(waitTime);
if (data != null) {
break;
}
timePassed = System.currentTimeMillis() - begin;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyNewData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
class MyFutures {
private static final Map FUTURES = new ConcurrentHashMap<>();
public static MyResource newResource(MyNewData data) {
final MyResource future = new MyResource();
FUTURES.put(data.getId(), future);
return future;
}
public static MyResource getResource(Long id) {
return FUTURES.remove(id);
}
public static Set getIds() {
return FUTURES.keySet();
}
}
/**
* 保护性暂停实例四
*
* @author 微信公众号「JAVA前线」
*/
public class ProtectDesignTest4 {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; i++) {
final int index = i;
new Thread(() -> {
try {
MyNewData data = new MyNewData("hello_" + index);
MyResource resource = MyFutures.newResource(data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(1);
resource.sendData(data);
System.out.println("生产数据data=" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(1);
for (Long i : MyFutures.getIds()) {
final long index = i;
new Thread(() -> {
MyResource resource = MyFutures.getResource(index);
int timeOut = 3000;
System.out.println("接收数据data=" + resource.getData(timeOut));
}).start();
}
}
}
5 DUBBO应用实例
我们顺着这一个链路跟踪代码:消费者发送请求 > 提供者接收请求并执行,并且将运行结果发送给消费者 >消费者接收结果。
(1) 消费者发送请求
消费者发送的数据包含请求ID,并且将关系维护进FUTURES容器
final class HeaderExchangeChannel implements ExchangeChannel {
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 代码1
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
class DefaultFuture implements ResponseFuture {
// FUTURES容器
private static final Map FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
// 请求ID
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
}
(2) 提供者接收请求并执行,并且将运行结果发送给消费者
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// response与请求ID对应
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// message = RpcInvocation包含方法名、参数名、参数值等
Object msg = req.getData();
try {
// DubboProtocol.reply执行实际业务方法
CompletableFuture
(3) 消费者接收结果
以下DUBBO源码很好体现了保护性暂停这个设计模式,说明参看注释
class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
// 取出对应的请求对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 放弃锁并使当前线程阻塞,直到发出信号中断它或者达到超时时间
done.await(timeout, TimeUnit.MILLISECONDS);
// 阻塞结束后再判断是否完成
if (isDone()) {
break;
}
// 阻塞结束后判断是否超时
if(System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// response对象仍然为空则抛出超时异常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
private void doReceived(Response res) {
lock.lock();
try {
// 接收到服务器响应赋值response
response = res;
if (done != null) {
// 唤醒get方法中处于等待的代码块
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
6 文章总结
本文我们从基础案例介绍保护性暂停基本概念和实践,最终分析DUBBO源码中保护性暂停设计模式使用场景。我们在设计并发框架时要注意虚假唤醒问题,以及请求和响应关系对应问题,希望本文对大家有所帮助。
JAVA前线
互联网技术人思考与分享,欢迎长按关注