Skip to content

Commit 9a86b4a

Browse files
danielkecTomáš Kraus
andauthored
HTTP/2 Client with flow-control (helidon-io#6399)
* HTTP/2 Server Flow-control - inbound * HTTP/2 Client * HTTP/2 Client Flow-control - inbound/outbound * HTTP/2 Client Continuation --------- Signed-off-by: Daniel Kec <daniel.kec@oracle.com> Co-authored-by: Tomáš Kraus <tomas.kraus@oracle.com>
1 parent 4ba72b6 commit 9a86b4a

66 files changed

Lines changed: 3645 additions & 637 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteableImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void sendHeaders(Metadata headers) {
151151
streamWriter.writeHeaders(http2Headers,
152152
streamId,
153153
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
154-
FlowControl.NOOP);
154+
FlowControl.Outbound.NOOP);
155155
}
156156

157157
@Override
@@ -175,7 +175,8 @@ public void sendMessage(RES message) {
175175
Http2Flag.DataFlags.create(0),
176176
streamId);
177177

178-
streamWriter.write(new Http2FrameData(header, bufferData), FlowControl.NOOP);
178+
//FIXME: FC and MAX_FRAME_SIZE
179+
streamWriter.writeData(new Http2FrameData(header, bufferData), FlowControl.Outbound.NOOP);
179180
}
180181

181182
@Override
@@ -187,7 +188,7 @@ public void close(Status status, Metadata trailers) {
187188
streamWriter.writeHeaders(http2Headers,
188189
streamId,
189190
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
190-
FlowControl.NOOP);
191+
FlowControl.Outbound.NOOP);
191192
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
192193
}
193194

nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandlerNotFound.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void init() {
5050
streamWriter.writeHeaders(http2Headers,
5151
streamId,
5252
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
53-
FlowControl.NOOP);
53+
FlowControl.Outbound.NOOP);
5454
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
5555
}
5656

nima/http/media/media/src/main/java/io/helidon/nima/http/media/ReadableEntityBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public int read() throws IOException {
226226
return -1;
227227
}
228228
ensureBuffer(512);
229-
if (currentBuffer == null) {
229+
if (finished || currentBuffer == null) {
230230
return -1;
231231
}
232232
return currentBuffer.read();
@@ -238,7 +238,7 @@ public int read(byte[] b, int off, int len) throws IOException {
238238
return -1;
239239
}
240240
ensureBuffer(len);
241-
if (currentBuffer == null) {
241+
if (finished || currentBuffer == null) {
242242
return -1;
243243
}
244244
return currentBuffer.read(b, off, len);

nima/http2/http2/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@
4646
<artifactId>hamcrest-all</artifactId>
4747
<scope>test</scope>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.junit.jupiter</groupId>
51+
<artifactId>junit-jupiter-params</artifactId>
52+
<scope>test</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>io.helidon.logging</groupId>
56+
<artifactId>helidon-logging-jul</artifactId>
57+
<scope>test</scope>
58+
</dependency>
59+
<dependency>
60+
<groupId>org.mockito</groupId>
61+
<artifactId>mockito-core</artifactId>
62+
<scope>test</scope>
63+
</dependency>
4964
</dependencies>
5065
</project>
5166

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Copyright (c) 2023 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.nima.http2;
18+
19+
import java.time.Duration;
20+
import java.util.function.BiConsumer;
21+
22+
import io.helidon.common.Builder;
23+
24+
import static java.lang.System.Logger.Level.DEBUG;
25+
26+
/**
27+
* HTTP/2 Flow control for connection.
28+
*/
29+
public class ConnectionFlowControl {
30+
31+
private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");
32+
33+
private final Type type;
34+
private final BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter;
35+
private final Duration timeout;
36+
private final WindowSize.Inbound inboundConnectionWindowSize;
37+
private final WindowSize.Outbound outboundConnectionWindowSize;
38+
39+
private volatile int maxFrameSize = WindowSize.DEFAULT_MAX_FRAME_SIZE;
40+
private volatile int initialWindowSize = WindowSize.DEFAULT_WIN_SIZE;
41+
42+
private ConnectionFlowControl(Type type,
43+
int initialWindowSize,
44+
int maxFrameSize,
45+
BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter,
46+
Duration timeout) {
47+
this.type = type;
48+
this.windowUpdateWriter = windowUpdateWriter;
49+
this.timeout = timeout;
50+
this.inboundConnectionWindowSize =
51+
WindowSize.createInbound(type,
52+
0,
53+
initialWindowSize,
54+
maxFrameSize,
55+
windowUpdateWriter);
56+
outboundConnectionWindowSize =
57+
WindowSize.createOutbound(type, 0, this);
58+
}
59+
60+
/**
61+
* Create connection HTTP/2 flow-control for server side.
62+
*
63+
* @param windowUpdateWriter method called for sending WINDOW_UPDATE frames to the client.
64+
* @return Connection HTTP/2 flow-control
65+
*/
66+
public static ConnectionFlowControlBuilder serverBuilder(BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter) {
67+
return new ConnectionFlowControlBuilder(Type.SERVER, windowUpdateWriter);
68+
}
69+
70+
/**
71+
* Create connection HTTP/2 flow-control for client side.
72+
*
73+
* @param windowUpdateWriter method called for sending WINDOW_UPDATE frames to the server.
74+
* @return Connection HTTP/2 flow-control
75+
*/
76+
public static ConnectionFlowControlBuilder clientBuilder(BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter) {
77+
return new ConnectionFlowControlBuilder(Type.CLIENT, windowUpdateWriter);
78+
}
79+
80+
/**
81+
* Create stream specific inbound and outbound flow control.
82+
*
83+
* @param streamId stream id
84+
* @return stream flow control
85+
*/
86+
public StreamFlowControl createStreamFlowControl(int streamId) {
87+
return new StreamFlowControl(type, streamId, this, windowUpdateWriter);
88+
}
89+
90+
/**
91+
* Increment outbound connection flow control window, called when WINDOW_UPDATE is received.
92+
*
93+
* @param increment number of bytes other side has requested on top of actual demand
94+
* @return outbound window size after increment
95+
*/
96+
public long incrementOutboundConnectionWindowSize(int increment) {
97+
return outboundConnectionWindowSize.incrementWindowSize(increment);
98+
}
99+
100+
/**
101+
* Decrement inbound connection flow control window, called when DATA frame is received.
102+
*
103+
* @param decrement received DATA frame size in bytes
104+
* @return inbound window size after decrement
105+
*/
106+
public long decrementInboundConnectionWindowSize(int decrement) {
107+
return inboundConnectionWindowSize.decrementWindowSize(decrement);
108+
}
109+
110+
/**
111+
* Reset MAX_FRAME_SIZE for all streams, existing and future ones.
112+
*
113+
* @param maxFrameSize to split data frames according to when larger
114+
*/
115+
public void resetMaxFrameSize(int maxFrameSize) {
116+
this.maxFrameSize = maxFrameSize;
117+
}
118+
119+
/**
120+
* Reset an initial window size value for outbound flow control windows of a new streams.
121+
* Don't forget to call stream.flowControl().outbound().resetStreamWindowSize(...) for each stream
122+
* to align window size of existing streams.
123+
*
124+
* @param initialWindowSize INIT_WINDOW_SIZE received
125+
*/
126+
public void resetInitialWindowSize(int initialWindowSize) {
127+
if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
128+
LOGGER_OUTBOUND.log(DEBUG, String.format("%s OFC STR *: Recv INIT_WINDOW_SIZE %s", type, initialWindowSize));
129+
}
130+
this.initialWindowSize = initialWindowSize;
131+
}
132+
133+
/**
134+
* Connection outbound flow control window,
135+
* decrements when DATA are sent and increments when WINDOW_UPDATE or INIT_WINDOW_SIZE is received.
136+
* Blocks sending when window is depleted.
137+
*
138+
* @return connection outbound flow control window
139+
*/
140+
public WindowSize.Outbound outbound() {
141+
return outboundConnectionWindowSize;
142+
}
143+
144+
/**
145+
* Connection inbound window is always manipulated by respective stream flow control,
146+
* therefore package private is enough.
147+
*
148+
* @return connection inbound flow control window
149+
*/
150+
WindowSize.Inbound inbound() {
151+
return inboundConnectionWindowSize;
152+
}
153+
154+
int maxFrameSize() {
155+
return maxFrameSize;
156+
}
157+
158+
int initialWindowSize() {
159+
return initialWindowSize;
160+
}
161+
162+
Duration timeout() {
163+
return timeout;
164+
}
165+
166+
enum Type {
167+
SERVER, CLIENT;
168+
}
169+
170+
/**
171+
* Connection flow control builder.
172+
*/
173+
public static class ConnectionFlowControlBuilder implements Builder<ConnectionFlowControlBuilder, ConnectionFlowControl> {
174+
175+
private static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(100);
176+
private final Type type;
177+
private final BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter;
178+
private int initialWindowSize = WindowSize.DEFAULT_WIN_SIZE;
179+
private int maxFrameSize = WindowSize.DEFAULT_MAX_FRAME_SIZE;
180+
private Duration blockTimeout = DEFAULT_TIMEOUT;
181+
182+
ConnectionFlowControlBuilder(Type type, BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter) {
183+
this.type = type;
184+
this.windowUpdateWriter = windowUpdateWriter;
185+
}
186+
187+
/**
188+
* Outbound flow control INITIAL_WINDOW_SIZE setting for new HTTP/2 connections.
189+
*
190+
* @param initialWindowSize units of octets
191+
* @return updated builder
192+
*/
193+
public ConnectionFlowControlBuilder initialWindowSize(int initialWindowSize) {
194+
this.initialWindowSize = initialWindowSize;
195+
return this;
196+
}
197+
198+
/**
199+
* Initial MAX_FRAME_SIZE setting for new HTTP/2 connections.
200+
* Maximum size of data frames in bytes we are prepared to accept from the other size.
201+
* Default value is 2^14(16_384).
202+
*
203+
* @param maxFrameSize data frame size in bytes between 2^14(16_384) and 2^24-1(16_777_215)
204+
* @return updated client
205+
*/
206+
public ConnectionFlowControlBuilder maxFrameSize(int maxFrameSize) {
207+
this.maxFrameSize = maxFrameSize;
208+
return this;
209+
}
210+
211+
/**
212+
* Timeout for blocking between windows size check iterations.
213+
*
214+
* @param timeout duration
215+
* @return updated builder
216+
*/
217+
public ConnectionFlowControlBuilder blockTimeout(Duration timeout) {
218+
this.blockTimeout = timeout;
219+
return this;
220+
}
221+
222+
@Override
223+
public ConnectionFlowControl build() {
224+
return new ConnectionFlowControl(type, initialWindowSize, maxFrameSize, windowUpdateWriter, blockTimeout);
225+
}
226+
}
227+
}

0 commit comments

Comments
 (0)