Skip to content

Commit 5a4f993

Browse files
authored
4.x: Graceful client connection close (helidon-io#8051)
* helidon-io#6545 Graceful client connection close
1 parent 5d10cd9 commit 5a4f993

19 files changed

Lines changed: 893 additions & 32 deletions

File tree

webclient/api/src/main/java/io/helidon/webclient/api/HttpClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*
2424
* @param <REQ> type of the client request
2525
*/
26-
public interface HttpClient<REQ extends ClientRequest<REQ>> {
26+
public interface HttpClient<REQ extends ClientRequest<REQ>> extends ReleasableResource {
2727
/**
2828
* Create a request for a method.
2929
*
@@ -32,6 +32,13 @@ public interface HttpClient<REQ extends ClientRequest<REQ>> {
3232
*/
3333
REQ method(Method method);
3434

35+
/**
36+
* Gracefully close all opened client specific connections.
37+
*/
38+
default void closeResource() {
39+
// Do nothing by default
40+
}
41+
3542
/**
3643
* Shortcut for get method with a path.
3744
*

webclient/api/src/main/java/io/helidon/webclient/api/LoomClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ public HttpClientRequest method(Method method) {
151151
tcpProtocolIds);
152152
}
153153

154+
@Override
155+
public void closeResource() {
156+
for (ProtocolSpi o : List.copyOf(clientSpiByProtocol.values())) {
157+
o.spi().releaseResource();
158+
}
159+
}
160+
154161
@Override
155162
public <T, C extends ProtocolConfig> T client(Protocol<T, C> protocol, C protocolConfig) {
156163
return protocol.provider().protocol(this, protocolConfig);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2023 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.webclient.spi;
18+
19+
import io.helidon.webclient.api.ReleasableResource;
20+
21+
import static java.lang.System.Logger.Level;
22+
23+
/**
24+
* Client connection cache with release shutdown hook to provide graceful shutdown.
25+
*/
26+
public abstract class ClientConnectionCache implements ReleasableResource {
27+
28+
private static final System.Logger LOGGER = System.getLogger(ClientConnectionCache.class.getName());
29+
30+
protected ClientConnectionCache(boolean shared) {
31+
if (shared) {
32+
Runtime.getRuntime().addShutdownHook(new Thread(this::onShutdown));
33+
}
34+
}
35+
36+
private void onShutdown() {
37+
if (LOGGER.isLoggable(Level.DEBUG)) {
38+
LOGGER.log(Level.DEBUG, "Gracefully closing connections in client connection cache.");
39+
}
40+
this.releaseResource();
41+
}
42+
}

webclient/api/src/main/java/io/helidon/webclient/spi/HttpClientSpi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919
import io.helidon.webclient.api.ClientRequest;
2020
import io.helidon.webclient.api.ClientUri;
2121
import io.helidon.webclient.api.FullClientRequest;
22+
import io.helidon.webclient.api.ReleasableResource;
2223

2324
/**
2425
* Integration for HTTP versions to provide a single API.
2526
*/
26-
public interface HttpClientSpi {
27+
public interface HttpClientSpi extends ReleasableResource {
2728
/**
2829
* Return whether this HTTP version can handle the provided request.
2930
* <p>

webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,18 @@ class Http1ClientImpl implements Http1Client, HttpClientSpi {
2828
private final Http1ClientConfig clientConfig;
2929
private final Http1ClientProtocolConfig protocolConfig;
3030
private final Http1ConnectionCache connectionCache;
31+
private final Http1ConnectionCache clientCache;
3132

3233
Http1ClientImpl(WebClient webClient, Http1ClientConfig clientConfig) {
3334
this.webClient = webClient;
3435
this.clientConfig = clientConfig;
3536
this.protocolConfig = clientConfig.protocolConfig();
3637
if (clientConfig.shareConnectionCache()) {
3738
this.connectionCache = Http1ConnectionCache.shared();
39+
this.clientCache = null;
3840
} else {
3941
this.connectionCache = Http1ConnectionCache.create();
42+
this.clientCache = connectionCache;
4043
}
4144
}
4245

@@ -86,6 +89,13 @@ public ClientRequest<?> clientRequest(FullClientRequest<?> clientRequest, Client
8689
.fragment(clientUri.fragment());
8790
}
8891

92+
@Override
93+
public void closeResource() {
94+
if (clientCache != null) {
95+
this.clientCache.closeResource();
96+
}
97+
}
98+
8999
WebClient webClient() {
90100
return webClient;
91101
}

webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCache.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package io.helidon.webclient.http1;
1818

1919
import java.time.Duration;
20+
import java.util.Collection;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.LinkedBlockingDeque;
2425
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2527

2628
import io.helidon.common.tls.Tls;
2729
import io.helidon.http.ClientRequestHeaders;
@@ -33,27 +35,33 @@
3335
import io.helidon.webclient.api.Proxy;
3436
import io.helidon.webclient.api.TcpClientConnection;
3537
import io.helidon.webclient.api.WebClient;
38+
import io.helidon.webclient.spi.ClientConnectionCache;
3639

3740
import static java.lang.System.Logger.Level.DEBUG;
3841

3942
/**
4043
* Cache of HTTP/1.1 connections for keep alive.
4144
*/
42-
class Http1ConnectionCache {
45+
class Http1ConnectionCache extends ClientConnectionCache {
4346
private static final System.Logger LOGGER = System.getLogger(Http1ConnectionCache.class.getName());
4447
private static final Tls NO_TLS = Tls.builder().enabled(false).build();
4548
private static final String HTTPS = "https";
46-
private static final Http1ConnectionCache SHARED = create();
49+
private static final Http1ConnectionCache SHARED = new Http1ConnectionCache(true);
4750
private static final List<String> ALPN_ID = List.of(Http1Client.PROTOCOL_ID);
4851
private static final Duration QUEUE_TIMEOUT = Duration.ofMillis(10);
4952
private final Map<ConnectionKey, LinkedBlockingDeque<TcpClientConnection>> cache = new ConcurrentHashMap<>();
53+
private final AtomicBoolean closed = new AtomicBoolean();
54+
55+
protected Http1ConnectionCache(boolean shared) {
56+
super(shared);
57+
}
5058

5159
static Http1ConnectionCache shared() {
5260
return SHARED;
5361
}
5462

5563
static Http1ConnectionCache create() {
56-
return new Http1ConnectionCache();
64+
return new Http1ConnectionCache(false);
5765
}
5866

5967
ClientConnection connection(Http1ClientImpl http1Client,
@@ -71,6 +79,16 @@ ClientConnection connection(Http1ClientImpl http1Client,
7179
}
7280
}
7381

82+
@Override
83+
public void closeResource() {
84+
if (closed.getAndSet(true)) {
85+
return;
86+
}
87+
cache.values().stream()
88+
.flatMap(Collection::stream)
89+
.forEach(TcpClientConnection::closeResource);
90+
}
91+
7492
private boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders<?> headers) {
7593
if (headers.contains(HeaderValues.CONNECTION_CLOSE)) {
7694
return false;
@@ -90,6 +108,11 @@ private ClientConnection keepAliveConnection(Http1ClientImpl http1Client,
90108
Tls tls,
91109
ClientUri uri,
92110
Proxy proxy) {
111+
112+
if (closed.get()) {
113+
throw new IllegalStateException("Connection cache is closed");
114+
}
115+
93116
Http1ClientConfig clientConfig = http1Client.clientConfig();
94117

95118
ConnectionKey connectionKey = new ConnectionKey(uri.scheme(),

webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.Future;
2626
import java.util.concurrent.Semaphore;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.concurrent.locks.Lock;
2930
import java.util.concurrent.locks.ReadWriteLock;
3031
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -85,8 +86,7 @@ class Http2ClientConnection {
8586
private Http2Settings serverSettings = Http2Settings.builder()
8687
.build();
8788
private Future<?> handleTask;
88-
89-
private volatile boolean closed = false;
89+
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);
9090

9191
Http2ClientConnection(Http2ClientImpl http2Client, ClientConnection connection) {
9292
this.protocolConfig = http2Client.protocolConfig();
@@ -177,7 +177,7 @@ Http2ClientStream tryStream(Http2StreamConfig config) {
177177
}
178178

179179
boolean closed() {
180-
return closed || (protocolConfig.ping() && !ping());
180+
return state.get().closed() || (protocolConfig.ping() && !ping());
181181
}
182182

183183
boolean ping() {
@@ -203,13 +203,15 @@ void updateLastStreamId(int lastStreamId) {
203203
}
204204

205205
void close() {
206-
closed = true;
207-
try {
208-
handleTask.cancel(true);
209-
ctx.log(LOGGER, TRACE, "Closing connection");
210-
connection.closeResource();
211-
} catch (Throwable e) {
212-
ctx.log(LOGGER, TRACE, "Failed to close HTTP/2 connection.", e);
206+
this.goAway(0, Http2ErrorCode.NO_ERROR, "Closing connection");
207+
if (state.getAndSet(State.CLOSED) != State.CLOSED) {
208+
try {
209+
handleTask.cancel(true);
210+
ctx.log(LOGGER, TRACE, "Closing connection");
211+
connection.closeResource();
212+
} catch (Throwable e) {
213+
ctx.log(LOGGER, TRACE, "Failed to close HTTP/2 connection.", e);
214+
}
213215
}
214216
}
215217

@@ -268,14 +270,14 @@ private void start(Http2ClientProtocolConfig protocolConfig,
268270
try {
269271
while (!Thread.interrupted()) {
270272
if (!handle()) {
271-
closed = true;
273+
this.close();
272274
ctx.log(LOGGER, TRACE, "Connection closed");
273275
return;
274276
}
275277
}
276278
ctx.log(LOGGER, TRACE, "Client listener interrupted");
277279
} catch (Throwable t) {
278-
closed = true;
280+
this.close();
279281
ctx.log(LOGGER, DEBUG, "Failed to handle HTTP/2 client connection", t);
280282
}
281283
});
@@ -457,8 +459,26 @@ private void ackSettings() {
457459
}
458460

459461
private void goAway(int streamId, Http2ErrorCode errorCode, String msg) {
460-
Http2Settings http2Settings = Http2Settings.create();
461-
Http2GoAway frame = new Http2GoAway(streamId, errorCode, msg);
462-
writer.write(frame.toFrameData(http2Settings, 0, Http2Flag.NoFlags.create()));
462+
if (State.OPEN == state.getAndSet(State.GO_AWAY)) {
463+
Http2Settings http2Settings = Http2Settings.create();
464+
Http2GoAway frame = new Http2GoAway(streamId, errorCode, msg);
465+
writer.write(frame.toFrameData(http2Settings, 0, Http2Flag.NoFlags.create()));
466+
}
467+
}
468+
469+
private enum State {
470+
CLOSED(true),
471+
GO_AWAY(true),
472+
OPEN(false);
473+
474+
private final boolean closed;
475+
476+
State(boolean closed){
477+
this.closed = closed;
478+
}
479+
480+
boolean closed() {
481+
return closed;
482+
}
463483
}
464484
}

webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnectionHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ void close() {
7777
// this is to prevent concurrent modification (connections remove themselves from the map)
7878
Set<Http2ClientConnection> toClose = new HashSet<>(allConnections.keySet());
7979
toClose.forEach(Http2ClientConnection::close);
80-
this.activeConnection.set(null);
80+
Http2ClientConnection active = this.activeConnection.getAndSet(null);
81+
if (active != null) {
82+
active.close();
83+
}
8184
this.allConnections.clear();
8285
}
8386

webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@ class Http2ClientImpl implements Http2Client, HttpClientSpi {
3030
private final Http2ClientConfig clientConfig;
3131
private final Http2ClientProtocolConfig protocolConfig;
3232
private final Http2ConnectionCache connectionCache;
33+
private final Http2ConnectionCache clientCache;
3334

3435
Http2ClientImpl(WebClient webClient, Http2ClientConfig clientConfig) {
3536
this.webClient = webClient;
3637
this.clientConfig = clientConfig;
3738
this.protocolConfig = clientConfig.protocolConfig();
3839
if (clientConfig.shareConnectionCache()) {
3940
this.connectionCache = Http2ConnectionCache.shared();
41+
this.clientCache = null;
4042
} else {
4143
this.connectionCache = Http2ConnectionCache.create();
44+
this.clientCache = connectionCache;
4245
}
4346
}
4447

@@ -94,6 +97,13 @@ public ClientRequest<?> clientRequest(FullClientRequest<?> clientRequest, Client
9497
.fragment(clientUri.fragment());
9598
}
9699

100+
@Override
101+
public void closeResource() {
102+
if (clientCache != null) {
103+
this.clientCache.closeResource();
104+
}
105+
}
106+
97107
WebClient webClient() {
98108
return webClient;
99109
}

0 commit comments

Comments
 (0)