在中银消金三方服务平台,数据源配置中可以配置数据源调用的超时时间,代码中使用这个用户配置的超时时间作为 connectionRequestTimeout
、connectionTimeout
和 socketTimeout
参数。在数据源调用明细中,明显可以看出数据源的调用时长有远大于配置的超时时间,客户提出不符合预期,要求数据源的调用时间在超过配置的超时时间后能够终止。
httpclient 超时参数
上面提到,代码中使用配置的超时时间作为 httpclient 的 connectionRequestTimeout
、connectionTimeout
和 socketTimeout
参数,下面简单介绍一下这三个参数的含义。
connectionRequestTimeout
:指从连接池获取连接的超时时间(当请求并发数量大于连接池中的连接数量时,则获取不到连接的请求会被放入 pending 队列等待,如果超过设定的时间,则抛出超时异常)。connectionTimeout
:指客户端和服务器建立连接的超时时间。(当客户端和服务器在建立链接时,如果在指定时间内无法成功建立链接,则抛出ConnectionTimeoutException
)。socketTimeout
:指客户端从服务器读取数据的超时时间,即客户端和服务器 socket 通信的超时时间,其实这个时间是客户端两次读取数据的最长时间,如果客户端在网络抖动的情况下,每次返回部分数据,两次数据包的时间在设定时间之内,也是不会超时的。
问题背景
为了保证计时的准确性,我们采用异步提交线程池,用 Future.get(timeout)
的方式保证任务可以在超过设定时间后,计时的准确性,大致代码如下:
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) throws IOException, InterruptedException {
for (int i = 0; i < 10; i++) {
// 请求一个阻塞接口,不会返回数据,必定超时
HttpGet httpGet = new HttpGet("*****");
CloseableHttpResponse response = null;
Future<CloseableHttpResponse> future = null;
try {
future = executor.submit(() -> {
try {
return HttpClientUtil.execute(httpGet);
} catch (Exception e) {
logger.error("", e);
return null;
}
});
response = future.get(5, TimeUnit.SECONDS);
System.out.println("response = " + response);
} catch (Exception e) {
if (e instanceof TimeoutException && future != null) {
logger.info(Thread.currentThread().getName() + " start cancel future");
logger.error("", e);
}
} finally {
httpGet.abort();
httpGet.releaseConnection();
if (null != response) {
EntityUtils.consume(response.getEntity());
}
}
}
}
}
在功能上线的两周后,现场反馈说有大量超时,导致大量调用返回超时异常,出现异常
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:313)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:279)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at cn.tongdun.freyr.http.SimpleGetRequestExecutor.lambda$execute$0(SimpleGetRequestExecutor.java:56)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
出现大量异常后,服务就无法使用了,即任何接口的调用都是超时状态。此时通过 debug 发现,即使没有调用,连接也依旧处于 lease
状态。
问题排查
首先,出现 Timeout waiting for connection from pool
是由于 httpclient 在从连接池获取连接时,在 connectionRequectTimeout
时间内没有获取到连接,而抛出的异常信息,从连接池获取连接的流程如下。
httpclient 从连接池获取连接
首先,根据请求的路由和 token 构建 ConnectionRequest
对象,此对象保存了获取从连接池获取连接的 get
方法,代码如下:
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
// ......
Object userToken = context.getUserToken();
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
if (execAware != null) {
if (execAware.isAborted()) {
connRequest.cancel();
throw new RequestAbortedException("Request aborted");
} else {
execAware.setCancellable(connRequest);
}
}
// .....
}
我们可以看到,此时使用的超时时间就是我们传入配置的 connectionRequestTimeout
,下面我们看下 ConnectionRequest
对象的构建。
@Override
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route));
}
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
if (conn.isOpen()) {
final HttpHost host;
if (route.getProxyHost() != null) {
host = route.getProxyHost();
} else {
host = route.getTargetHost();
}
final SocketConfig socketConfig = resolveSocketConfig(host);
conn.setSocketTimeout(socketConfig.getSoTimeout());
}
return conn;
}
};
}
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try {
entry = future.get(timeout, tunit);
if (entry == null || future.isCancelled()) {
throw new InterruptedException();
}
Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
}
return CPoolProxy.newProxy(entry);
} catch (final TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
}
我们可以看到,代码中捕获了 TimeoutException
,并重新构建 ConnectionPoolTimeoutException
,也就是说,future.get
会在超时的时候抛出 TimeoutException
,然后被外层的 catch
捕获,下面我们看 final Future<CPoolEntry> future = this.pool.lease(route, state, null);
中的 Future
是如何实现的:
/**
* {@inheritDoc}
* <p>
* Please note that this class does not maintain its own pool of execution
* {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
* or {@link Future#get(long, TimeUnit)} method on the {@link Future}
* returned by this method in order for the lease operation to complete.
*/
@Override
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
return new Future<E>() {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (cancelled.compareAndSet(false, true)) {
done.set(true);
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
if (callback != null) {
callback.cancelled();
}
return true;
} else {
return false;
}
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
@Override
public boolean isDone() {
return done.get();
}
@Override
public E get() throws InterruptedException, ExecutionException {
try {
return get(0L, TimeUnit.MILLISECONDS);
} catch (final TimeoutException ex) {
throw new ExecutionException(ex);
}
}
@Override
public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
synchronized (this) {
try {
for (;;) {
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
}
} catch (final IOException ex) {
done.set(true);
if (callback != null) {
callback.failed(ex);
}
throw new ExecutionException(ex);
}
}
}
};
}
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit tunit,
final Future<E> future) throws IOException, InterruptedException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
}
this.lock.lock();
try {
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
boolean success = false;
try {
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
}
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
通过代码,我们可以看到 lease 方法返回了 Future
接口的匿名内部类实现,其中 get(final long timeout, final TimeUnit tunit)
方法会在同步代码块下循环从连接池获取连接,即 getPoolEntryBlocking
方法。
在 getPoolEntryBlocking
方法中,会在加锁情况下,循环获取连接,当获取连接为空时(即连接池中没有 available
的连接),会执行 success = this.condition.awaitUntil(deadline)
,即阻塞到超时的死亡时间线,如果在阻塞过程中,有其他连接释放(释放的代码后面我们会看到),则会把 success
置为 true
,如果没有在死亡线达到之前获取到连接,则 success
为 false
,在最后,(!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())
会跳出循环,抛出 throw new TimeoutException("Timeout waiting for connection")
,被外层捕获。
这就是 httpclient 从连接池获取连接的过程,以及在超时情况下抛出的异常信息。
httpclient 归还连接
我们看到在使用 httpclient 的时候,在 finally
代码块中,我们调用了 abort
和 releaseConnection
方法,用来释放 httpclient 连接,下面我们分析下如何释放连接归还连接池。
abort 释放连接
首先看 abort
方法:
@Override
public void abort() {
if (this.aborted.compareAndSet(false, true)) {
final Cancellable cancellable = this.cancellableRef.getAndSet(null);
if (cancellable != null) {
cancellable.cancel();
}
}
}
代码中,将 abort
变量从 false
置为 true
,之后获取 Cancellable
,并将其置空,调用 cancel
方法,我们看下在何处会放入 Cancellable
:
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
// ......
Object userToken = context.getUserToken();
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
if (execAware != null) {
if (execAware.isAborted()) {
connRequest.cancel();
throw new RequestAbortedException("Request aborted");
} else {
// ① 将 ConnectionRequest 放入 Cancellable
execAware.setCancellable(connRequest);
}
}
final RequestConfig config = context.getRequestConfig();
final HttpClientConnection managedConn;
try {
final int timeout = config.getConnectionRequestTimeout();
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
} catch(final InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw new RequestAbortedException("Request aborted", interrupted);
} catch(final ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause == null) {
cause = ex;
}
throw new RequestAbortedException("Request execution failed", cause);
}
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
if (config.isStaleConnectionCheckEnabled()) {
// validate connection
if (managedConn.isOpen()) {
this.log.debug("Stale connection check");
if (managedConn.isStale()) {
this.log.debug("Stale connection detected");
managedConn.close();
}
}
}
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
if (execAware != null) {
// ② 将 ConnectionHolder 放入 Cancellable
execAware.setCancellable(connHolder);
}
// .....
}
第一处,将 ConnectionRequest
放入。也就是说,如果此时是在执行从连接池获取连接之前调用了 Cancellable.cancel
,则会在构建好请求后,直接释放请求,抛出 throw new RequestAbortedException("Request aborted");
异常;如果此时在连接获取过程中,在 getPoolEntryBlocking
中调用 Cancellable.cancel
,在循环中会调用 future.isCancelled()
判断是否取消任务,抛出 throw new InterruptedException("Operation interrupted")
。
第二处,即获取到连接后,将 HttpClientConnection
的持有者 ConnectionHolder
放入。此时,我们看 ConnectionHolder
的 cancel
方法的实现:
@Override
public boolean cancel() {
final boolean alreadyReleased = this.released.get();
log.debug("Cancelling request execution");
abortConnection();
return !alreadyReleased;
}
@Override
public void abortConnection() {
if (this.released.compareAndSet(false, true)) {
synchronized (this.managedConn) {
try {
this.managedConn.shutdown();
log.debug("Connection discarded");
} catch (final IOException ex) {
if (this.log.isDebugEnabled()) {
this.log.debug(ex.getMessage(), ex);
}
} finally {
this.manager.releaseConnection(
this.managedConn, null, 0, TimeUnit.MILLISECONDS);
}
}
}
}
ConnectionHolder
的 cancel
方法调用了 abortConnection
方法,在此方法中,首先将 release
置为 true
,之后在同步代码块情况下,先调用 this.managedConn.shutdown()
以下是该方法的源码:
@Override
public void shutdown() throws IOException {
final Socket socket = this.socketHolder.getAndSet(null);
if (socket != null) {
// force abortive close (RST)
try {
socket.setSoLinger(true, 0);
} catch (final IOException ex) {
} finally {
socket.close();
}
}
}
主要功能是为了将 Connection
的 Socket 对象置空,之后将 socket
关闭。
之后,在 finally
中,调用 manager.releaseConnection
方法,源码如下:
@Override
public void releaseConnection(
final HttpClientConnection managedConn,
final Object state,
final long keepalive, final TimeUnit tunit) {
Args.notNull(managedConn, "Managed connection");
synchronized (managedConn) {
final CPoolEntry entry = CPoolProxy.detach(managedConn);
if (entry == null) {
return;
}
final ManagedHttpClientConnection conn = entry.getConnection();
try {
if (conn.isOpen()) {
final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
entry.setState(state);
entry.updateExpiry(keepalive, effectiveUnit);
if (this.log.isDebugEnabled()) {
final String s;
if (keepalive > 0) {
s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
} else {
s = "indefinitely";
}
this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
}
conn.setSocketTimeout(0);
}
} finally {
this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
if (this.log.isDebugEnabled()) {
this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
}
}
}
}
在 finally
中的 pool.release
方法中:
@Override
public void release(final E entry, final boolean reusable) {
this.lock.lock();
try {
if (this.leased.remove(entry)) {
final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
pool.free(entry, reusable);
if (reusable && !this.isShutDown) {
this.available.addFirst(entry);
} else {
entry.close();
}
onRelease(entry);
Future<E> future = pool.nextPending();
if (future != null) {
this.pending.remove(future);
} else {
future = this.pending.poll();
}
if (future != null) {
this.condition.signalAll();
}
}
} finally {
this.lock.unlock();
}
}
我们可以看到,连接对象从 lease
队列移除,并调用 pool.free
方法,将连接重新放回 available
队列的第一个:
public void free(final E entry, final boolean reusable) {
Args.notNull(entry, "Pool entry");
final boolean found = this.leased.remove(entry);
Asserts.check(found, "Entry %s has not been leased from this pool", entry);
if (reusable) {
this.available.addFirst(entry);
}
}
注意,此处分别是 ConnectionPool
和 RouteSpecificPool
,ConnectionPool
包含了 RouteSpecificPool
。
在将连接放回 available
队列后,pool.nexPending
获取待获取连接的挂起队列,移除一个获取连接,之后 condition.signalAll()
通知所有的等待的 future
获取连接。
releaseConnection 释放连接
我们来看 releaseConnection
的代码:
/**
* A convenience method to simplify migration from HttpClient 3.1 API. This method is
* equivalent to {@link #reset()}.
*
* @since 4.2
*/
public void releaseConnection() {
reset();
}
/**
* Resets internal state of the request making it reusable.
*
* @since 4.2
*/
public void reset() {
final Cancellable cancellable = this.cancellableRef.getAndSet(null);
if (cancellable != null) {
cancellable.cancel();
}
this.aborted.set(false);
}
我们可以看到,此处也是使用和 abort
一样的方式调用 Cancellable.cancel
方法,但是,在方法最后,将 aborted
设置为了 false
。
简单翻一下方法注释,Resets internal state of the request making it reusable.
,即重置请求的内部状态,使其可以重新使用。 我们发现,releaseConnection
的作用是使请求可以重用,所以将 aborted
重新置为了 false
。
问题处理
通过上面分析,我们发现,调用 abort
方法时,将请求的 aborted
标志设为了 true
,而调用 releaseConnection
后,请求的 aborted
标志被重置为了 false
。而在代码中,会通过 aborted
标志判断当前请求是否可用:
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
// 使用 aborted 判断是否需要取消 ConnectionRequest
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
if (execAware != null) {
if (execAware.isAborted()) {
connRequest.cancel();
throw new RequestAbortedException("Request aborted");
} else {
execAware.setCancellable(connRequest);
}
}
// ......
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
if (execAware != null) {
execAware.setCancellable(connHolder);
}
HttpResponse response;
for (int execCount = 1;; execCount++) {
if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
throw new NonRepeatableRequestException("Cannot retry request " +
"with a non-repeatable request entity.");
}
// 使用 aborted 判断请求是否丢弃
if (execAware != null && execAware.isAborted()) {
throw new RequestAbortedException("Request aborted");
}
// ......
final int timeout = config.getSocketTimeout();
if (timeout >= 0) {
managedConn.setSocketTimeout(timeout);
}
// 使用 aborted 判断请求是否丢弃
if (execAware != null && execAware.isAborted()) {
throw new RequestAbortedException("Request aborted");
}
if (this.log.isDebugEnabled()) {
this.log.debug("Executing request " + request.getRequestLine());
}
if (this.log.isDebugEnabled()) {
this.log.debug("Executing request " + request.getRequestLine());
}
if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
if (this.log.isDebugEnabled()) {
this.log.debug("Target auth state: " + targetAuthState.getState());
}
this.authenticator.generateAuthResponse(request, targetAuthState, context);
}
if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
if (this.log.isDebugEnabled()) {
this.log.debug("Proxy auth state: " + proxyAuthState.getState());
}
this.authenticator.generateAuthResponse(request, proxyAuthState, context);
}
response = requestExecutor.execute(request, managedConn, context);
// The connection is in or can be brought to a re-usable state.
if (reuseStrategy.keepAlive(response, context)) {
// Set the idle duration of this connection
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
if (this.log.isDebugEnabled()) {
final String s;
if (duration > 0) {
s = "for " + duration + " " + TimeUnit.MILLISECONDS;
} else {
s = "indefinitely";
}
this.log.debug("Connection can be kept alive " + s);
}
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
connHolder.markReusable();
} else {
connHolder.markNonReusable();
}
if (needAuthentication(
targetAuthState, proxyAuthState, route, response, context)) {
// Make sure the response body is fully consumed, if present
final HttpEntity entity = response.getEntity();
if (connHolder.isReusable()) {
EntityUtils.consume(entity);
} else {
managedConn.close();
if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
&& proxyAuthState.isConnectionBased()) {
this.log.debug("Resetting proxy auth state");
proxyAuthState.reset();
}
if (targetAuthState.getState() == AuthProtocolState.SUCCESS
&& targetAuthState.isConnectionBased()) {
this.log.debug("Resetting target auth state");
targetAuthState.reset();
}
}
// discard previous auth headers
final HttpRequest original = request.getOriginal();
if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
request.removeHeaders(AUTH.WWW_AUTH_RESP);
}
if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
request.removeHeaders(AUTH.PROXY_AUTH_RESP);
}
} else {
break;
}
}
if (userToken == null) {
userToken = userTokenHandler.getUserToken(context);
context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
}
if (userToken != null) {
connHolder.setState(userToken);
}
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
} else {
return new HttpResponseProxy(response, connHolder);
}
} catch (final ConnectionShutdownException ex) {
final InterruptedIOException ioex = new InterruptedIOException(
"Connection has been shut down");
ioex.initCause(ex);
throw ioex;
} catch (final HttpException ex) {
connHolder.abortConnection();
throw ex;
} catch (final IOException ex) {
connHolder.abortConnection();
if (proxyAuthState.isConnectionBased()) {
proxyAuthState.reset();
}
if (targetAuthState.isConnectionBased()) {
targetAuthState.reset();
}
throw ex;
} catch (final RuntimeException ex) {
connHolder.abortConnection();
if (proxyAuthState.isConnectionBased()) {
proxyAuthState.reset();
}
if (targetAuthState.isConnectionBased()) {
targetAuthState.reset();
}
throw ex;
} catch (final Error error) {
connManager.shutdown();
throw error;
}
}
在同步状态下,由于调用 abort
和 releaseConnection
时,此时客户端请求已经结束,所以修改状态不会造成问题(由于项目中每次都是重新构建请求,所以也没有重用请求)。但是当请求在异步执行时,在执行请求的同时如果丢弃连接(执行 finally
的 abort
和 releaseConnection
),此时可能在连接获取的阻塞阶段,cancel
可能取消的是 future
,而如果此时 future
已经获取并返回连接,由于后面调用 releaseConnection
将请求的 aborted
置为 false
,判断中断失效,不会抛出异常,那么已获取的连接就不会被释放。
修复方法
删除 releaseConnection
如果在使用中不会复用请求,那么我们可以不再调用 releaseConnection
,因为 abort
已经调用了 Cancellable
的 cancel
方法,因此,相当于 releaseConnection
只会执行 this.aborted.set(false)
,而这会导致执行请求的线程在判断时不抛出异常,也就不会被捕获然后释放连接。
将 abort 和 releaseConnection 放入异步方法的 finally 执行
当我们想要复用连接时,我们可以将外面的释放连接方法放入异步方法的 finally
中执行,如下:
try {
future = executor.submit(() -> {
try {
return HttpClientUtil.execute(httpGet);
} catch (Exception e) {
logger.error("", e);
return null;
} finally {
httpGet.abort();
httpGet.releaseConnection();
}
});
response = future.get(5, TimeUnit.SECONDS);
System.out.println("response = " + response);
} catch (Exception e) {
if (e instanceof TimeoutException && future != null) {
logger.info(Thread.currentThread().getName() + " start cancel future");
logger.error("", e);
}
}
} finally {
if (null != response) {
EntityUtils.consume(response.getEntity());
}
}
注意,在 Future.get
方法超时后,不会终止任务,而是丢弃任务执行的结果,因此,当调用结束时,方法依旧会执行 finally
释放连接,但是要通过 Futrue.isDone
判断 Future
是否执行结束,才能重新复用请求对象。