Skip to content

Commit c16a9fe

Browse files
authored
Adds configurable buffering support to TcpClientConnection. Defaults to 4KB. New tests. See issue 9858. (helidon-io#9893)
Signed-off-by: Santiago Pericas-Geertsen <santiago.pericasgeertsen@oracle.com>
1 parent 965f8bf commit c16a9fe

9 files changed

Lines changed: 267 additions & 19 deletions

File tree

common/buffers/src/main/java/io/helidon/common/buffers/DataWriter.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
2121
* Do not combine {@link #write(io.helidon.common.buffers.BufferData)} and {@link #writeNow(io.helidon.common.buffers.BufferData)}
2222
* to a single underlying transport, unless you can guarantee there will not be a race between these two methods.
2323
*/
24-
public interface DataWriter {
24+
public interface DataWriter extends AutoCloseable {
2525
/**
2626
* Write buffers, may delay writing and may write on a different thread.
2727
* This method also may combine multiple calls into a single write to the underlying transport.
@@ -49,4 +49,19 @@ public interface DataWriter {
4949
* @param buffer buffer to write
5050
*/
5151
void writeNow(BufferData buffer);
52+
53+
/**
54+
* Flushes to the underlying transport any pending data that has been written using
55+
* either {@link #write(BufferData)} or {@link #write(BufferData...)}.
56+
*/
57+
default void flush() {
58+
}
59+
60+
/**
61+
* Closes this writer and frees any associated resources. Defaults to just a call
62+
* to {@link #flush()}.
63+
*/
64+
default void close() {
65+
flush();
66+
}
5267
}

http/http2/src/main/java/io/helidon/http/http2/Http2ConnectionWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ public Http2ConnectionWriter(SocketContext ctx, DataWriter writer, List<Http2Fra
5656

5757
@Override
5858
public void write(Http2FrameData frame) {
59-
lockedWrite(frame);
59+
lockedWrite(frame);
6060
}
6161

6262
@Override
@@ -193,11 +193,11 @@ private void noLockWrite(Http2FrameData frame) {
193193
listener.frameHeader(ctx, streamId, headerData);
194194

195195
if (frameHeader.length() == 0) {
196-
writer.write(headerData);
196+
writer.writeNow(headerData);
197197
} else {
198198
BufferData data = frame.data().copy();
199199
listener.frame(ctx, streamId, data);
200-
writer.write(BufferData.create(headerData, data));
200+
writer.writeNow(BufferData.create(headerData, data));
201201
}
202202
}
203203

webclient/api/src/main/java/io/helidon/webclient/api/HttpClientConfigBlueprint.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
2+
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -266,4 +266,17 @@ default ClientRequestHeaders defaultRequestHeaders() {
266266
@Option.Configured
267267
@Option.DefaultInt(131072)
268268
int maxInMemoryEntity();
269+
270+
/**
271+
* Buffer size used when writing data to the underlying socket on a client TCP
272+
* connection. A value that is less or equal to 1 can be set to disable buffering
273+
* at this level. Note that if writing data to the socket in small chunks, they
274+
* may not be delivered to the network immediately due to Nagle's algorithm (i.e.,
275+
* if TCP_NO_DELAY is turned off).
276+
*
277+
* @return number of bytes in write buffer
278+
*/
279+
@Option.Configured
280+
@Option.DefaultInt(4096)
281+
int writeBufferSize();
269282
}

webclient/api/src/main/java/io/helidon/webclient/api/TcpClientConnection.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,10 @@ Obtain target socket through proxy (if enabled), or connect to target socket
146146
} else {
147147
this.helidonSocket = PlainSocket.client(socket, channelId);
148148
}
149+
149150
this.reader = new DataReader(helidonSocket);
150-
this.writer = new DirectDatatWriter(helidonSocket);
151+
int writeBufferSize = webClient.prototype().writeBufferSize();
152+
this.writer = new BufferedDataWriter(helidonSocket, writeBufferSize);
151153

152154
return this;
153155
}
@@ -310,33 +312,68 @@ private String certsToString(Certificate[] peerCertificates) {
310312
return String.join(", ", certs);
311313
}
312314

313-
private static class DirectDatatWriter implements DataWriter {
315+
static class BufferedDataWriter implements DataWriter {
316+
private final BufferData writerBuffer;
314317
private final HelidonSocket helidonSocket;
315318

316-
DirectDatatWriter(HelidonSocket helidonSocket) {
319+
BufferedDataWriter(HelidonSocket helidonSocket, int bufferSize) {
317320
this.helidonSocket = helidonSocket;
321+
this.writerBuffer = bufferSize > 0 ? BufferData.create(bufferSize) : null;
318322
}
319323

320324
@Override
321325
public void write(BufferData... buffers) {
322-
writeNow(buffers);
326+
for (BufferData buffer : buffers) {
327+
write(buffer);
328+
}
323329
}
324330

325331
@Override
326332
public void write(BufferData buffer) {
333+
// no buffering, just write now
334+
if (writerBuffer == null) {
335+
writeNow(buffer);
336+
return;
337+
}
338+
// have space in writerBuffer?
339+
if (buffer.available() <= writerBuffer.capacity()) {
340+
writerBuffer.write(buffer);
341+
return;
342+
}
343+
// write and reset writerBuffer if we have data
344+
if (writerBuffer.available() > 0) {
345+
writeNow(writerBuffer);
346+
writerBuffer.reset();
347+
// have space now?
348+
if (buffer.available() <= writerBuffer.capacity()) {
349+
writerBuffer.write(buffer);
350+
return;
351+
}
352+
}
353+
// no luck, we need to write it now
327354
writeNow(buffer);
328355
}
329356

330357
@Override
331358
public void writeNow(BufferData... buffers) {
359+
flush();
332360
for (BufferData buffer : buffers) {
333-
writeNow(buffer);
361+
helidonSocket.write(buffer);
334362
}
335363
}
336364

337365
@Override
338366
public void writeNow(BufferData buffer) {
367+
flush();
339368
helidonSocket.write(buffer);
340369
}
370+
371+
@Override
372+
public void flush() {
373+
if (writerBuffer != null && writerBuffer.available() > 0) {
374+
helidonSocket.write(writerBuffer);
375+
writerBuffer.reset();
376+
}
377+
}
341378
}
342379
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.helidon.webclient.api;
17+
18+
import io.helidon.common.buffers.BufferData;
19+
import io.helidon.common.socket.HelidonSocket;
20+
import io.helidon.common.socket.PeerInfo;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.hamcrest.CoreMatchers.is;
25+
import static org.hamcrest.MatcherAssert.assertThat;
26+
27+
class BufferedDataWriterTest {
28+
29+
@Test
30+
void testBufferFull() {
31+
TestHelidonSocket socket = new TestHelidonSocket();
32+
try (TcpClientConnection.BufferedDataWriter writer = new TcpClientConnection.BufferedDataWriter(socket, 5)) {
33+
BufferData data = BufferData.create("0");
34+
writer.write(data);
35+
assertThat(socket.writeCounter(), is(0));
36+
37+
for (int i = 0; i < 4; i++) {
38+
data.rewind();
39+
writer.write(data);
40+
}
41+
assertThat(socket.writeCounter(), is(0));
42+
43+
data.rewind();
44+
writer.write(data);
45+
assertThat(socket.writeCounter(), is(5));
46+
47+
writer.flush();
48+
assertThat(socket.writeCounter(), is(6));
49+
}
50+
}
51+
52+
@Test
53+
void testNoFlush() {
54+
TestHelidonSocket socket = new TestHelidonSocket();
55+
try (TcpClientConnection.BufferedDataWriter writer = new TcpClientConnection.BufferedDataWriter(socket, 5)) {
56+
BufferData data = BufferData.create("0");
57+
for (int i = 0; i < 5; i++) {
58+
data.rewind();
59+
writer.write(data);
60+
}
61+
assertThat(socket.writeCounter(), is(0));
62+
}
63+
assertThat(socket.writeCounter(), is(5));
64+
}
65+
66+
@Test
67+
void testWritesAtBoundary() {
68+
TestHelidonSocket socket = new TestHelidonSocket();
69+
TcpClientConnection.BufferedDataWriter writer = new TcpClientConnection.BufferedDataWriter(socket, 5);
70+
BufferData data = BufferData.create("0");
71+
for (int i = 0; i < 50; i++) {
72+
data.rewind();
73+
writer.write(data);
74+
}
75+
writer.flush();
76+
writer.close();
77+
assertThat(socket.writeCounter(), is(50));
78+
}
79+
80+
@Test
81+
void testWritesOverBoundary() {
82+
TestHelidonSocket socket = new TestHelidonSocket();
83+
TcpClientConnection.BufferedDataWriter writer = new TcpClientConnection.BufferedDataWriter(socket, 5);
84+
BufferData data = BufferData.create("0");
85+
for (int i = 0; i < 52; i++) {
86+
data.rewind();
87+
writer.write(data);
88+
}
89+
writer.flush();
90+
writer.close();
91+
assertThat(socket.writeCounter(), is(52));
92+
}
93+
94+
@Test
95+
void testNoBuffering() {
96+
TestHelidonSocket socket = new TestHelidonSocket();
97+
TcpClientConnection.BufferedDataWriter writer = new TcpClientConnection.BufferedDataWriter(socket, -1);
98+
BufferData data = BufferData.create("0");
99+
for (int i = 0; i < 10; i++) {
100+
data.rewind();
101+
writer.write(data);
102+
assertThat(socket.writeCounter(), is(i + 1));
103+
}
104+
writer.flush();
105+
writer.close();
106+
}
107+
108+
@Test
109+
void testTooLargeToBuffer() {
110+
TestHelidonSocket socket = new TestHelidonSocket();
111+
TcpClientConnection.BufferedDataWriter writer = new TcpClientConnection.BufferedDataWriter(socket, 1);
112+
BufferData data = BufferData.create("12345");
113+
writer.write(data); // can't buffer
114+
assertThat(socket.writeCounter(), is(5));
115+
writer.close();
116+
}
117+
118+
static class TestHelidonSocket implements HelidonSocket {
119+
120+
private int writeCounter;
121+
122+
public int writeCounter() {
123+
return writeCounter;
124+
}
125+
126+
@Override
127+
public void close() {
128+
}
129+
130+
@Override
131+
public void idle() {
132+
}
133+
134+
@Override
135+
public boolean isConnected() {
136+
return false;
137+
}
138+
139+
@Override
140+
public int read(BufferData buffer) {
141+
return 0;
142+
}
143+
144+
@Override
145+
public void write(BufferData buffer) {
146+
writeCounter += buffer.available();
147+
}
148+
149+
@Override
150+
public PeerInfo remotePeer() {
151+
return null;
152+
}
153+
154+
@Override
155+
public PeerInfo localPeer() {
156+
return null;
157+
}
158+
159+
@Override
160+
public boolean isSecure() {
161+
return false;
162+
}
163+
164+
@Override
165+
public String socketId() {
166+
return null;
167+
}
168+
169+
@Override
170+
public String childSocketId() {
171+
return null;
172+
}
173+
174+
@Override
175+
public byte[] get() {
176+
return new byte[0];
177+
}
178+
}
179+
}

webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallEntityChain.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,7 @@ public WebClientServiceResponse doProceed(ClientConnection connection,
6262
writeBuffer.write(entity);
6363
}
6464
writer.write(writeBuffer);
65+
writer.flush();
6566

6667
return readResponse(serviceRequest, connection, reader);
6768
}

0 commit comments

Comments
 (0)