Skip to content

Commit cf73aea

Browse files
committed
Cached connection close detection helidon-io#7367
1 parent 02d33f5 commit cf73aea

8 files changed

Lines changed: 295 additions & 6 deletions

File tree

common/socket/src/main/java/io/helidon/common/socket/HelidonSocket.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,19 @@ public interface HelidonSocket extends SocketContext, Supplier<byte[]> {
3030
*/
3131
void close();
3232

33+
/**
34+
* Sets the socket to idle mode. Idle mode expects no bytes coming over the
35+
* socket but keeps reading exactly one byte in case connection is severed.
36+
* Idle mode should be used in case of client side connection caching.
37+
*/
38+
void idle();
39+
40+
/**
41+
* Check if socket is connected.
42+
* @return true if connected
43+
*/
44+
boolean isConnected();
45+
3346
/**
3447
* Read bytes from the socket. This method blocks until at least 1 byte is available.
3548
*
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2023 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.common.socket;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.UncheckedIOException;
22+
import java.util.Objects;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Future;
27+
28+
import io.helidon.common.LazyValue;
29+
30+
/**
31+
* InputStream capable of cheap continuous reading of single byte from inactive socket input stream
32+
* in order to detect blocking socket disconnection.
33+
* Needs to be switched to idle monitoring mode typically when client connection is cached.
34+
* <p>
35+
* Returns automatically to standard mode when read method is executed.
36+
*/
37+
class IdleInputStream extends InputStream {
38+
39+
private final InputStream upstream;
40+
private final LazyValue<ExecutorService> executor;
41+
private volatile int next = -1;
42+
private volatile boolean closed = false;
43+
private Future<?> idlingThread;
44+
45+
IdleInputStream(InputStream upstream, String childSocketId, String socketId) {
46+
this.upstream = upstream;
47+
executor = LazyValue.create(() -> Executors.newThreadPerTaskExecutor(
48+
Thread.ofVirtual()
49+
.name("helidon-socket-monitor-" + childSocketId + "-" + socketId, 0)
50+
.factory())
51+
);
52+
}
53+
54+
@Override
55+
public int read() throws IOException {
56+
if (idlingThread != null) {
57+
endIdle();
58+
}
59+
if (next < 0) {
60+
return upstream.read();
61+
} else {
62+
int res = next;
63+
next = -1;
64+
return res;
65+
}
66+
}
67+
68+
@Override
69+
public int read(byte[] b, int off, int len) throws IOException {
70+
if (idlingThread != null) {
71+
endIdle();
72+
}
73+
if (next < 0) {
74+
return upstream.read(b, off, len);
75+
} else {
76+
Objects.checkFromIndexSize(off, len, b.length);
77+
if (len == 0) {
78+
return 0;
79+
}
80+
b[off] = (byte) next;
81+
next = -1;
82+
return 1;
83+
}
84+
}
85+
86+
@Override
87+
public void close() throws IOException {
88+
upstream.close();
89+
closed = true;
90+
}
91+
92+
/**
93+
* Enable idle mode, connection is expected to be idle,
94+
* single byte will be read asynchronously
95+
* in blocking manner to detect severed connection.
96+
*/
97+
void idle() {
98+
if (idlingThread != null) {
99+
throw new IllegalStateException("Already in idle mode!");
100+
}
101+
idlingThread = executor.get().submit(this::handle);
102+
}
103+
104+
boolean isClosed() {
105+
return closed;
106+
}
107+
108+
private void handle() {
109+
try {
110+
next = upstream.read();
111+
if (next <= 0) {
112+
closed = true;
113+
}
114+
} catch (IOException e) {
115+
closed = true;
116+
throw new UncheckedIOException(e);
117+
}
118+
}
119+
120+
private void endIdle() {
121+
try {
122+
idlingThread.get();
123+
idlingThread = null;
124+
} catch (InterruptedException | ExecutionException e) {
125+
closed = true;
126+
throw new RuntimeException("Exception in socket monitor thread.", e);
127+
}
128+
}
129+
}

common/socket/src/main/java/io/helidon/common/socket/PlainSocket.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package io.helidon.common.socket;
1818

1919
import java.io.IOException;
20-
import java.io.InputStream;
2120
import java.io.OutputStream;
2221
import java.io.UncheckedIOException;
2322
import java.net.InetSocketAddress;
@@ -38,7 +37,7 @@ public sealed class PlainSocket implements HelidonSocket permits TlsSocket {
3837
private final Socket delegate;
3938
private final String childSocketId;
4039
private final String socketId;
41-
private final InputStream inputStream;
40+
private final IdleInputStream inputStream;
4241
private final OutputStream outputStream;
4342

4443
/**
@@ -53,7 +52,7 @@ protected PlainSocket(Socket delegate, String childSocketId, String socketId) {
5352
this.childSocketId = childSocketId;
5453
this.socketId = socketId;
5554
try {
56-
this.inputStream = delegate.getInputStream();
55+
this.inputStream = new IdleInputStream(delegate.getInputStream(), childSocketId, socketId);
5756
this.outputStream = delegate.getOutputStream();
5857
} catch (IOException e) {
5958
throw new UncheckedIOException(e);
@@ -117,6 +116,16 @@ public void close() {
117116
}
118117
}
119118

119+
@Override
120+
public void idle() {
121+
inputStream.idle();
122+
}
123+
124+
@Override
125+
public boolean isConnected() {
126+
return !inputStream.isClosed();
127+
}
128+
120129
@Override
121130
public int read(BufferData buffer) {
122131
return buffer.readFrom(inputStream);

webclient/api/src/main/java/io/helidon/webclient/api/TcpClientConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public HelidonSocket helidonSocket() {
232232
}
233233

234234
public boolean isConnected() {
235-
return socket != null && socket.isConnected();
235+
return socket != null && socket.isConnected() && helidonSocket().isConnected();
236236
}
237237

238238
Socket socket() {

webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCache.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ private boolean finishRequest(LinkedBlockingDeque<TcpClientConnection> connectio
154154
if (conn.isConnected()) {
155155
try {
156156
if (connectionQueue.offer(conn, QUEUE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
157+
conn.helidonSocket().idle(); // mark it as idle to stay blocked at read for closed conn detection
157158
if (LOGGER.isLoggable(DEBUG)) {
158159
LOGGER.log(DEBUG, "[%s] client connection returned %s",
159160
conn.channelId(),

webclient/http1/src/test/java/io/helidon/webclient/http1/Http1ClientTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,16 @@ public void close() {
737737

738738
}
739739

740+
@Override
741+
public void idle() {
742+
743+
}
744+
745+
@Override
746+
public boolean isConnected() {
747+
return true;
748+
}
749+
740750
@Override
741751
public int read(BufferData buffer) {
742752
return 0;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright (c) 2023 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.webclient.tests;
18+
19+
import io.helidon.http.Http;
20+
import io.helidon.webclient.api.WebClient;
21+
import io.helidon.webserver.WebServer;
22+
import io.helidon.webserver.http.HttpRouting;
23+
import io.helidon.webserver.http1.Http1Route;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import static io.helidon.http.Http.Method.POST;
28+
import static org.hamcrest.MatcherAssert.assertThat;
29+
import static org.hamcrest.Matchers.is;
30+
31+
public class SharedCacheTest {
32+
@Test
33+
void cacheHttp1WithServerRestart() {
34+
WebServer webServer = null;
35+
try {
36+
HttpRouting routing = HttpRouting.builder()
37+
.route(Http1Route.route(POST, "/", (req, res) -> res.send()))
38+
.build();
39+
40+
webServer = WebServer.builder()
41+
.routing(routing)
42+
.build()
43+
.start();
44+
45+
int port = webServer.port();
46+
47+
WebClient webClient = WebClient.builder()
48+
.keepAlive(true)
49+
.baseUri("http://localhost:" + port + "/")
50+
.build();
51+
52+
try (var res = webClient.post().submit("WHATEVER")) {
53+
assertThat(res.status(), is(Http.Status.OK_200));
54+
}
55+
webServer.stop();
56+
webServer = WebServer.builder()
57+
.port(port)
58+
.routing(routing)
59+
.build()
60+
.start();
61+
62+
try (var res = webClient.post().submit("WHATEVER")) {
63+
assertThat(res.status(), is(Http.Status.OK_200));
64+
}
65+
} finally {
66+
if (webServer != null) {
67+
webServer.stop();
68+
}
69+
}
70+
}
71+
72+
@Test
73+
void cacheHttp1NoRestart() {
74+
Http.HeaderName clientPortHeader = Http.HeaderNames.create("client-port");
75+
WebServer webServer = null;
76+
try {
77+
HttpRouting routing = HttpRouting.builder()
78+
.route(Http1Route.route(POST, "/", (req, res) -> {
79+
res.header(clientPortHeader, String.valueOf(req.remotePeer().port()));
80+
res.send();
81+
}))
82+
.build();
83+
84+
webServer = WebServer.builder()
85+
.routing(routing)
86+
.build()
87+
.start();
88+
89+
int port = webServer.port();
90+
91+
WebClient webClient = WebClient.builder()
92+
.keepAlive(true)
93+
.baseUri("http://localhost:" + port)
94+
.build();
95+
96+
Integer firstReqClientPort;
97+
try (var res = webClient.post().submit("WHATEVER")) {
98+
firstReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE);
99+
assertThat(res.status(), is(Http.Status.OK_200));
100+
}
101+
102+
Integer secondReqClientPort;
103+
try (var res = webClient.post().submit("WHATEVER")) {
104+
secondReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE);
105+
assertThat(res.status(), is(Http.Status.OK_200));
106+
}
107+
108+
assertThat("In case of cached connection client port must be the same.",
109+
secondReqClientPort,
110+
is(firstReqClientPort));
111+
} finally {
112+
if (webServer != null) {
113+
webServer.stop();
114+
}
115+
}
116+
}
117+
}

webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectSocket.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ public String childSocketId() {
7575
public void close() {
7676
}
7777

78+
@Override
79+
public void idle() {
80+
81+
}
82+
83+
@Override
84+
public boolean isConnected() {
85+
return true;
86+
}
87+
7888
@Override
7989
public int read(BufferData buffer) {
8090
return 0;

0 commit comments

Comments
 (0)