Dubbo存在内存泄漏

Netty历险记

共 4631字,需浏览 10分钟

 ·

2022-01-04 22:01

[前提]

本文阐述的内容基于Dubbo 2.7.3版本


[正文]


2b16dc230b03f49a6a16e15c8586f7e3.webp



如上图, 在上一篇 com.alibaba.fastjson存在内存泄漏 文章中, 我们解释了线程的threadLocals中存在内存泄漏的情况, 仔细观察上图, 还有一个地方, 在threadLocalMap属性的内部也存在662.19KB的内存, 这个地方也不正常.


查看threadLocalMap的内部属性


466e6a4c35b1049934fddb8e0ece4479.webp


在org.apache.dubbo.rpc.FutureContext内部的result属性`持有`651.93KB内存, 这个result的内容实际是Dubbo接口的返回值.  而这个FutureContext对象也是在调用外部Dubbo接口的时候创建的.


我们简单分析下一个业务线程调用Dubbo接口的过程.


当业务线程需要调用外部Dubbo接口的时候, 会创建一个DefaultFuture, 每个DefaultFuture对象都会有唯一的一个Id与之对应, 并把这个关系放到Map中


714c4ef10412efac9b32942356a31321.webp


private DefaultFuture(Channel channel, Request request, int timeout) {    this.channel = channel;    this.request = request;    this.id = request.getId();    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);    // 存储 id <-> DefaultFuture关系    FUTURES.put(id, this);    CHANNELS.put(id, channel);}

由于接口调用都会有超时, 那么如何实现这个超时机制呢?


将一个超时任务放入到时间轮上.


// org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuturepublic static DefaultFuture newFuture(Channel channel, Request request, int timeout) {    final DefaultFuture future = new DefaultFuture(channel, request, timeout);    // 超时检查    timeoutCheck(future);    return future;}
// org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheckprivate static void timeoutCheck(DefaultFuture future) { TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);}


在我之前的 Netty中的时间轮(v3.10.7) 文章中介绍了时间轮, 通过时间轮的方式, 检测任务是否超时到期了.


接下来就是将DefaultFuture等信息组装成一个FutureContext放入到线程的ThreadLocalMap中.

de66ea9a9c20d72ab669ef2c370a4628.webp


// org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvokeprotected Result doInvoke(final Invocation invocation) throws Throwable {    try {        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);        if (isOneway) {            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);            currentClient.send(inv, isSent);            return AsyncRpcResult.newDefaultAsyncResult(invocation);        } else {            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);            CompletableFuture responseFuture = currentClient.request(inv, timeout);            asyncRpcResult.subscribeTo(responseFuture);            //             FutureContext.getContext().setCompatibleFuture(responseFuture);            return asyncRpcResult;        }    }}
// org.apache.dubbo.rpc.FutureContextpublic class FutureContext { private static InternalThreadLocal futureTL = new InternalThreadLocal() { @Override protected FutureContext initialValue() { return new FutureContext(); } };
public static FutureContext getContext() { return futureTL.get(); }}


综上, id和DefaultFuture存到Map中, 设置好定时任务, DefaultFuture放到线程ThreadLocalMap中之后, 线程就可以被阻塞了.


f33f4ccf6b72c83f25001cd36af9a325.webp


线程调用get方法一直被阻塞.


当Dubbo的提供方返回数据之后, Dubbo调用方的线程就可以处理响应了. 


804a20751dae14548b8dc6246be586b1.webp



如上图, Dubbo调用方的Dubbo线程开始处理响应.


// org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)public static void received(Channel channel, Response response, boolean timeout) {    try {      // 从Map中移除id <-> DefaultFuture的关系        DefaultFuture future = FUTURES.remove(response.getId());        if (future != null) {            Timeout t = future.timeoutCheckTask;            if (!timeout) {                // 将时间轮上的超时任务取消掉                t.cancel();            }            //             future.doReceived(response);        } else {            logger.warn("The timeout response finally returned at "                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))                    + ", response " + response                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()                    + " -> " + channel.getRemoteAddress()));        }    } finally {        CHANNELS.remove(response.getId());    }}


首先从Map中移除id <-> DefaultFuture的关系, 将时间轮上的超时任务取消掉.


接下来就是把响应数据设置到DefaultFuture上, 并唤醒之前阻塞的线程.


// org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceivedprivate void doReceived(Response res) {    if (res.getStatus() == Response.OK) {        this.complete(res.getResult());    }}
// java.util.concurrent.CompletableFuture#completepublic boolean complete(T value) { // 将结果设置到DefaultFuture boolean triggered = completeValue(value); // 唤醒阻塞线程 postComplete(); return triggered;}


使用到了异步编程


被唤醒的阻塞线程就可以从DefaultFuture中拿到已设置好的数据,继续后续的业务处理.


0c3396e5a90d86433502f289bfece545.webp



如上图, 消费者的线程ThreadLocalMap中的FutureContext中的result值却一直留在线程的ThreadLocalMap中了,并不会被释放掉, 造成了内存泄漏.



浏览 54
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报