HttpClient连接无法释放问题

在中银消金三方服务平台,数据源配置中可以配置数据源调用的超时时间,代码中使用这个用户配置的超时时间作为 connectionRequestTimeoutconnectionTimeoutsocketTimeout 参数。在数据源调用明细中,明显可以看出数据源的调用时长有远大于配置的超时时间,客户提出不符合预期,要求数据源的调用时间在超过配置的超时时间后能够终止。

httpclient 超时参数

上面提到,代码中使用配置的超时时间作为 httpclient 的 connectionRequestTimeoutconnectionTimeoutsocketTimeout 参数,下面简单介绍一下这三个参数的含义。

  • connectionRequestTimeout:指从连接池获取连接的超时时间(当请求并发数量大于连接池中的连接数量时,则获取不到连接的请求会被放入 pending 队列等待,如果超过设定的时间,则抛出超时异常)。
  • connectionTimeout:指客户端和服务器建立连接的超时时间。(当客户端和服务器在建立链接时,如果在指定时间内无法成功建立链接,则抛出 ConnectionTimeoutException)。
  • socketTimeout:指客户端从服务器读取数据的超时时间,即客户端和服务器 socket 通信的超时时间,其实这个时间是客户端两次读取数据的最长时间,如果客户端在网络抖动的情况下,每次返回部分数据,两次数据包的时间在设定时间之内,也是不会超时的。

问题背景

为了保证计时的准确性,我们采用异步提交线程池,用 Future.get(timeout) 的方式保证任务可以在超过设定时间后,计时的准确性,大致代码如下:

public class Main {

    private static final Logger logger = LoggerFactory.getLogger(Main.class);

    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
            java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws IOException, InterruptedException {

        for (int i = 0; i < 10; i++) {
            // 请求一个阻塞接口,不会返回数据,必定超时
            HttpGet httpGet = new HttpGet("*****");
            CloseableHttpResponse response = null;
            Future<CloseableHttpResponse> future = null;
            try {
                future = executor.submit(() -> {
                    try {
                        return HttpClientUtil.execute(httpGet);
                    } catch (Exception e) {
                        logger.error("", e);
                        return null;
                    }
                });
                response = future.get(5, TimeUnit.SECONDS);
                System.out.println("response = " + response);
            } catch (Exception e) {
                if (e instanceof TimeoutException && future != null) {
                    logger.info(Thread.currentThread().getName() + " start cancel future");
                    logger.error("", e);
                }
            } finally {
                httpGet.abort();
                httpGet.releaseConnection();
                if (null != response) {
                    EntityUtils.consume(response.getEntity());
                }
            }
        }
    }
}

在功能上线的两周后,现场反馈说有大量超时,导致大量调用返回超时异常,出现异常

org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
        at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:313)
        at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:279)
        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
        at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
        at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
        at cn.tongdun.freyr.http.SimpleGetRequestExecutor.lambda$execute$0(SimpleGetRequestExecutor.java:56)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

出现大量异常后,服务就无法使用了,即任何接口的调用都是超时状态。此时通过 debug 发现,即使没有调用,连接也依旧处于 lease 状态。

问题排查

首先,出现 Timeout waiting for connection from pool 是由于 httpclient 在从连接池获取连接时,在 connectionRequectTimeout 时间内没有获取到连接,而抛出的异常信息,从连接池获取连接的流程如下。

httpclient 从连接池获取连接

首先,根据请求的路由和 token 构建 ConnectionRequest 对象,此对象保存了获取从连接池获取连接的 get 方法,代码如下:

@Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        // ......
        Object userToken = context.getUserToken();

        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            } else {
                execAware.setCancellable(connRequest);
            }
        }

        // .....
}        

我们可以看到,此时使用的超时时间就是我们传入配置的 connectionRequestTimeout,下面我们看下 ConnectionRequest 对象的构建。

    @Override
    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        Args.notNull(route, "HTTP route");
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection request: " + format(route, state) + formatStats(route));
        }
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {

            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
                if (conn.isOpen()) {
                    final HttpHost host;
                    if (route.getProxyHost() != null) {
                        host = route.getProxyHost();
                    } else {
                        host = route.getTargetHost();
                    }
                    final SocketConfig socketConfig = resolveSocketConfig(host);
                    conn.setSocketTimeout(socketConfig.getSoTimeout());
                }
                return conn;
            }

        };

    }

    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
            }
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }

我们可以看到,代码中捕获了 TimeoutException,并重新构建 ConnectionPoolTimeoutException,也就是说,future.get 会在超时的时候抛出 TimeoutException,然后被外层的 catch 捕获,下面我们看 final Future<CPoolEntry> future = this.pool.lease(route, state, null); 中的 Future 是如何实现的:

    /**
     * {@inheritDoc}
     * <p>
     * Please note that this class does not maintain its own pool of execution
     * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
     * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
     * returned by this method in order for the lease operation to complete.
     */
    @Override
    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");

        return new Future<E>() {

            private final AtomicBoolean cancelled = new AtomicBoolean(false);
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);

            @Override
            public boolean cancel(final boolean mayInterruptIfRunning) {
                if (cancelled.compareAndSet(false, true)) {
                    done.set(true);
                    lock.lock();
                    try {
                        condition.signalAll();
                    } finally {
                        lock.unlock();
                    }
                    if (callback != null) {
                        callback.cancelled();
                    }
                    return true;
                } else {
                    return false;
                }
            }

            @Override
            public boolean isCancelled() {
                return cancelled.get();
            }

            @Override
            public boolean isDone() {
                return done.get();
            }

            @Override
            public E get() throws InterruptedException, ExecutionException {
                try {
                    return get(0L, TimeUnit.MILLISECONDS);
                } catch (final TimeoutException ex) {
                    throw new ExecutionException(ex);
                }
            }

            @Override
            public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
                final E entry = entryRef.get();
                if (entry != null) {
                    return entry;
                }
                synchronized (this) {
                    try {
                        for (;;) {
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
                            if (validateAfterInactivity > 0)  {
                                if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
                                    if (!validate(leasedEntry)) {
                                        leasedEntry.close();
                                        release(leasedEntry, false);
                                        continue;
                                    }
                                }
                            }
                            entryRef.set(leasedEntry);
                            done.set(true);
                            onLease(leasedEntry);
                            if (callback != null) {
                                callback.completed(leasedEntry);
                            }
                            return leasedEntry;
                        }
                    } catch (final IOException ex) {
                        done.set(true);
                        if (callback != null) {
                            callback.failed(ex);
                        }
                        throw new ExecutionException(ex);
                    }
                }
            }

        };
    }

 private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }

                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

通过代码,我们可以看到 lease 方法返回了 Future 接口的匿名内部类实现,其中 get(final long timeout, final TimeUnit tunit) 方法会在同步代码块下循环从连接池获取连接,即 getPoolEntryBlocking 方法。

getPoolEntryBlocking 方法中,会在加锁情况下,循环获取连接,当获取连接为空时(即连接池中没有 available 的连接),会执行 success = this.condition.awaitUntil(deadline),即阻塞到超时的死亡时间线,如果在阻塞过程中,有其他连接释放(释放的代码后面我们会看到),则会把 success 置为 true,如果没有在死亡线达到之前获取到连接,则 successfalse,在最后,(!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis()) 会跳出循环,抛出 throw new TimeoutException("Timeout waiting for connection"),被外层捕获。

这就是 httpclient 从连接池获取连接的过程,以及在超时情况下抛出的异常信息。

httpclient 归还连接

我们看到在使用 httpclient 的时候,在 finally 代码块中,我们调用了 abortreleaseConnection 方法,用来释放 httpclient 连接,下面我们分析下如何释放连接归还连接池。

abort 释放连接

首先看 abort 方法:

    @Override
    public void abort() {
        if (this.aborted.compareAndSet(false, true)) {
            final Cancellable cancellable = this.cancellableRef.getAndSet(null);
            if (cancellable != null) {
                cancellable.cancel();
            }
        }
    }

代码中,将 abort 变量从 false 置为 true,之后获取 Cancellable,并将其置空,调用 cancel 方法,我们看下在何处会放入 Cancellable

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
       //  ......
        Object userToken = context.getUserToken();

        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            } else {
                // ① 将 ConnectionRequest 放入 Cancellable
                execAware.setCancellable(connRequest);
            }
        }

        final RequestConfig config = context.getRequestConfig();

        final HttpClientConnection managedConn;
        try {
            final int timeout = config.getConnectionRequestTimeout();
            managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final InterruptedException interrupted) {
            Thread.currentThread().interrupt();
            throw new RequestAbortedException("Request aborted", interrupted);
        } catch(final ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause == null) {
                cause = ex;
            }
            throw new RequestAbortedException("Request execution failed", cause);
        }

        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);

        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection
            if (managedConn.isOpen()) {
                this.log.debug("Stale connection check");
                if (managedConn.isStale()) {
                    this.log.debug("Stale connection detected");
                    managedConn.close();
                }
            }
        }

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            if (execAware != null) {
                // ② 将 ConnectionHolder 放入 Cancellable
                execAware.setCancellable(connHolder);
            }
    // .....
    }

第一处,将 ConnectionRequest 放入。也就是说,如果此时是在执行从连接池获取连接之前调用了 Cancellable.cancel,则会在构建好请求后,直接释放请求,抛出 throw new RequestAbortedException("Request aborted"); 异常;如果此时在连接获取过程中,在 getPoolEntryBlocking 中调用 Cancellable.cancel,在循环中会调用 future.isCancelled() 判断是否取消任务,抛出 throw new InterruptedException("Operation interrupted")

第二处,即获取到连接后,将 HttpClientConnection 的持有者 ConnectionHolder 放入。此时,我们看 ConnectionHoldercancel 方法的实现:

    @Override
    public boolean cancel() {
        final boolean alreadyReleased = this.released.get();
        log.debug("Cancelling request execution");
        abortConnection();
        return !alreadyReleased;
    }

    @Override
    public void abortConnection() {
        if (this.released.compareAndSet(false, true)) {
            synchronized (this.managedConn) {
                try {
                    this.managedConn.shutdown();
                    log.debug("Connection discarded");
                } catch (final IOException ex) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(ex.getMessage(), ex);
                    }
                } finally {
                    this.manager.releaseConnection(
                            this.managedConn, null, 0, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

ConnectionHoldercancel 方法调用了 abortConnection 方法,在此方法中,首先将 release 置为 true,之后在同步代码块情况下,先调用 this.managedConn.shutdown() 以下是该方法的源码:

    @Override
    public void shutdown() throws IOException {
        final Socket socket = this.socketHolder.getAndSet(null);
        if (socket != null) {
            // force abortive close (RST)
            try {
                socket.setSoLinger(true, 0);
            } catch (final IOException ex) {
            } finally {
                socket.close();
            }
        }
    }

主要功能是为了将 Connection 的 Socket 对象置空,之后将 socket 关闭。

之后,在 finally 中,调用 manager.releaseConnection 方法,源码如下:

    @Override
    public void releaseConnection(
            final HttpClientConnection managedConn,
            final Object state,
            final long keepalive, final TimeUnit tunit) {
        Args.notNull(managedConn, "Managed connection");
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.detach(managedConn);
            if (entry == null) {
                return;
            }
            final ManagedHttpClientConnection conn = entry.getConnection();
            try {
                if (conn.isOpen()) {
                    final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
                    entry.setState(state);
                    entry.updateExpiry(keepalive, effectiveUnit);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (keepalive > 0) {
                            s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                    }
                    conn.setSocketTimeout(0);
                }
            } finally {
                this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
                }
            }
        }
    }

finally 中的 pool.release 方法中:

    @Override
    public void release(final E entry, final boolean reusable) {
        this.lock.lock();
        try {
            if (this.leased.remove(entry)) {
                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown) {
                    this.available.addFirst(entry);
                } else {
                    entry.close();
                }
                onRelease(entry);
                Future<E> future = pool.nextPending();
                if (future != null) {
                    this.pending.remove(future);
                } else {
                    future = this.pending.poll();
                }
                if (future != null) {
                    this.condition.signalAll();
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

我们可以看到,连接对象从 lease 队列移除,并调用 pool.free 方法,将连接重新放回 available 队列的第一个:

    public void free(final E entry, final boolean reusable) {
        Args.notNull(entry, "Pool entry");
        final boolean found = this.leased.remove(entry);
        Asserts.check(found, "Entry %s has not been leased from this pool", entry);
        if (reusable) {
            this.available.addFirst(entry);
        }
    }

注意,此处分别是 ConnectionPoolRouteSpecificPoolConnectionPool 包含了 RouteSpecificPool

在将连接放回 available 队列后,pool.nexPending 获取待获取连接的挂起队列,移除一个获取连接,之后 condition.signalAll() 通知所有的等待的 future 获取连接。

releaseConnection 释放连接

我们来看 releaseConnection 的代码:

    /**
     * A convenience method to simplify migration from HttpClient 3.1 API. This method is
     * equivalent to {@link #reset()}.
     *
     * @since 4.2
     */
    public void releaseConnection() {
        reset();
    }

    /**
     * Resets internal state of the request making it reusable.
     *
     * @since 4.2
     */
    public void reset() {
        final Cancellable cancellable = this.cancellableRef.getAndSet(null);
        if (cancellable != null) {
            cancellable.cancel();
        }
        this.aborted.set(false);
    }

我们可以看到,此处也是使用和 abort 一样的方式调用 Cancellable.cancel 方法,但是,在方法最后,将 aborted 设置为了 false

简单翻一下方法注释,Resets internal state of the request making it reusable.,即重置请求的内部状态,使其可以重新使用。 我们发现,releaseConnection 的作用是使请求可以重用,所以将 aborted 重新置为了 false

问题处理

通过上面分析,我们发现,调用 abort 方法时,将请求的 aborted 标志设为了 true,而调用 releaseConnection 后,请求的 aborted 标志被重置为了 false。而在代码中,会通过 aborted 标志判断当前请求是否可用:

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        // 使用 aborted 判断是否需要取消 ConnectionRequest
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            } else {
                execAware.setCancellable(connRequest);
            }
        }

        // ......
      
        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            if (execAware != null) {
                execAware.setCancellable(connHolder);
            }

            HttpResponse response;
            for (int execCount = 1;; execCount++) {

                if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                    throw new NonRepeatableRequestException("Cannot retry request " +
                            "with a non-repeatable request entity.");
                }

        // 使用 aborted 判断请求是否丢弃
                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                // ......

                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    managedConn.setSocketTimeout(timeout);
                }

                // 使用 aborted 判断请求是否丢弃
                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                if (this.log.isDebugEnabled()) {
                    this.log.debug("Executing request " + request.getRequestLine());
                }

                if (this.log.isDebugEnabled()) {
                    this.log.debug("Executing request " + request.getRequestLine());
                }

                if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Target auth state: " + targetAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, targetAuthState, context);
                }
                if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Proxy auth state: " + proxyAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, proxyAuthState, context);
                }

                response = requestExecutor.execute(request, managedConn, context);

                // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    // Set the idle duration of this connection
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    connHolder.markReusable();
                } else {
                    connHolder.markNonReusable();
                }

                if (needAuthentication(
                        targetAuthState, proxyAuthState, route, response, context)) {
                    // Make sure the response body is fully consumed, if present
                    final HttpEntity entity = response.getEntity();
                    if (connHolder.isReusable()) {
                        EntityUtils.consume(entity);
                    } else {
                        managedConn.close();
                        if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
                                && proxyAuthState.isConnectionBased()) {
                            this.log.debug("Resetting proxy auth state");
                            proxyAuthState.reset();
                        }
                        if (targetAuthState.getState() == AuthProtocolState.SUCCESS
                                && targetAuthState.isConnectionBased()) {
                            this.log.debug("Resetting target auth state");
                            targetAuthState.reset();
                        }
                    }
                    // discard previous auth headers
                    final HttpRequest original = request.getOriginal();
                    if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
                        request.removeHeaders(AUTH.WWW_AUTH_RESP);
                    }
                    if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
                        request.removeHeaders(AUTH.PROXY_AUTH_RESP);
                    }
                } else {
                    break;
                }
            }

            if (userToken == null) {
                userToken = userTokenHandler.getUserToken(context);
                context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
            }
            if (userToken != null) {
                connHolder.setState(userToken);
            }

            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            } else {
                return new HttpResponseProxy(response, connHolder);
            }
        } catch (final ConnectionShutdownException ex) {
            final InterruptedIOException ioex = new InterruptedIOException(
                    "Connection has been shut down");
            ioex.initCause(ex);
            throw ioex;
        } catch (final HttpException ex) {
            connHolder.abortConnection();
            throw ex;
        } catch (final IOException ex) {
            connHolder.abortConnection();
            if (proxyAuthState.isConnectionBased()) {
                proxyAuthState.reset();
            }
            if (targetAuthState.isConnectionBased()) {
                targetAuthState.reset();
            }
            throw ex;
        } catch (final RuntimeException ex) {
            connHolder.abortConnection();
            if (proxyAuthState.isConnectionBased()) {
                proxyAuthState.reset();
            }
            if (targetAuthState.isConnectionBased()) {
                targetAuthState.reset();
            }
            throw ex;
        } catch (final Error error) {
            connManager.shutdown();
            throw error;
        }
    }

在同步状态下,由于调用 abortreleaseConnection 时,此时客户端请求已经结束,所以修改状态不会造成问题(由于项目中每次都是重新构建请求,所以也没有重用请求)。但是当请求在异步执行时,在执行请求的同时如果丢弃连接(执行 finallyabortreleaseConnection),此时可能在连接获取的阻塞阶段,cancel 可能取消的是 future,而如果此时 future 已经获取并返回连接,由于后面调用 releaseConnection 将请求的 aborted 置为 false,判断中断失效,不会抛出异常,那么已获取的连接就不会被释放。

修复方法

删除 releaseConnection

如果在使用中不会复用请求,那么我们可以不再调用 releaseConnection,因为 abort 已经调用了 Cancellablecancel 方法,因此,相当于 releaseConnection 只会执行 this.aborted.set(false),而这会导致执行请求的线程在判断时不抛出异常,也就不会被捕获然后释放连接。

将 abort 和 releaseConnection 放入异步方法的 finally 执行

当我们想要复用连接时,我们可以将外面的释放连接方法放入异步方法的 finally 中执行,如下:

            try {
                future = executor.submit(() -> {
                    try {
                        return HttpClientUtil.execute(httpGet);
                    } catch (Exception e) {
                        logger.error("", e);
                        return null;
                    } finally {
                        httpGet.abort();
                        httpGet.releaseConnection();
                    }
                });
                response = future.get(5, TimeUnit.SECONDS);
                System.out.println("response = " + response);
            } catch (Exception e) {
                if (e instanceof TimeoutException && future != null) {
                    logger.info(Thread.currentThread().getName() + " start cancel future");
                    logger.error("", e);
                    }
                }
            } finally {
                if (null != response) {
                    EntityUtils.consume(response.getEntity());
                }
            }

注意,在 Future.get 方法超时后,不会终止任务,而是丢弃任务执行的结果,因此,当调用结束时,方法依旧会执行 finally 释放连接,但是要通过 Futrue.isDone 判断 Future 是否执行结束,才能重新复用请求对象。