Skip to content

Commit 1f6ded8

Browse files
authored
Watermarked response backpressure helidon-io#3136 (helidon-io#4724)
Signed-off-by: Daniel Kec <daniel.kec@oracle.com> Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
1 parent 3a58b7c commit 1f6ded8

19 files changed

Lines changed: 748 additions & 225 deletions

File tree

docs/config/io_helidon_webserver_WebServer.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,22 @@ This is a standalone configuration type, prefix from configuration root: `server
7171
|`max-upgrade-content-length` |int |`65536` |Set a maximum length of the content of an upgrade request.
7272
7373
Default is `64*1024`
74+
|`backpressure-buffer-size` |long |`5242880` |Set a maximum length of the unflushed response data sending buffer can keep without applying backpressure.
75+
Depends on `backpressure-policy` what happens if max buffer size is reached.
76+
77+
Default is `5*1024*1024` - 5Mb
78+
|`backpressure-policy` | String | `LINEAR` |Sets the strategy for applying backpressure to the reactive stream
79+
of response data.
80+
81+
* LINEAR - Data chunks are requested one-by-one after previous data chunk has been written to Netty's buffer, when
82+
`backpressure-buffer-size` watermark is reached, new chunks are not requested until buffer size decrease under
83+
the watermark value.
84+
* PREFETCH - After first data chunk arrives, expected number of chunks needed to fill the buffer up
85+
to watermark is calculated and requested.
86+
* AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested and extra flush is initiated.
87+
* UNBOUNDED - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
88+
89+
Default is `LINEAR`
7490
|`port` |int |`0` |Configures a server port to listen on with the server socket. If port is
7591
`0` then any available ephemeral port will be used.
7692
|`receive-buffer-size` |int |{nbsp} |Configures proposed value of the TCP receive window that is advertised to the remote peer on the
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2022 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.microprofile.server;
18+
19+
import jakarta.validation.constraints.NotNull;
20+
import jakarta.ws.rs.GET;
21+
import jakarta.ws.rs.Path;
22+
import jakarta.ws.rs.Produces;
23+
import jakarta.ws.rs.QueryParam;
24+
import jakarta.ws.rs.client.WebTarget;
25+
import jakarta.ws.rs.core.MediaType;
26+
import jakarta.ws.rs.core.Response;
27+
import jakarta.ws.rs.core.StreamingOutput;
28+
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
import java.util.Random;
32+
33+
import io.helidon.microprofile.tests.junit5.AddBean;
34+
import io.helidon.microprofile.tests.junit5.AddConfig;
35+
import io.helidon.microprofile.tests.junit5.AddExtension;
36+
import io.helidon.microprofile.tests.junit5.DisableDiscovery;
37+
import io.helidon.microprofile.tests.junit5.HelidonTest;
38+
39+
import org.glassfish.jersey.ext.cdi1x.internal.CdiComponentProvider;
40+
import org.junit.jupiter.api.Test;
41+
42+
import static org.hamcrest.MatcherAssert.assertThat;
43+
import static org.hamcrest.CoreMatchers.is;
44+
45+
@HelidonTest
46+
@DisableDiscovery
47+
@AddBean(StreamingOutputLeakTest.DownloadResource.class)
48+
@AddExtension(ServerCdiExtension.class)
49+
@AddExtension(JaxRsCdiExtension.class)
50+
@AddExtension(CdiComponentProvider.class)
51+
@AddConfig(key = "server.backpressure-buffer-size", value = "20971520")//20Mb
52+
class StreamingOutputLeakTest {
53+
54+
private static final int SIZE10MB = 10 * 1024 * 1024;
55+
private static final int SIZE = SIZE10MB;
56+
private static final long NUMBER_OF_BUFS = 20;
57+
private static final byte[] DATA_10MB = new byte[SIZE];
58+
59+
static {
60+
Random r = new Random();
61+
r.nextBytes(DATA_10MB);
62+
}
63+
64+
/**
65+
* Reproducer for issue #4643
66+
*/
67+
@Test
68+
void streamingOutput(WebTarget target) throws IOException {
69+
70+
InputStream is = target.path("/download")
71+
.request()
72+
.get(InputStream.class);
73+
long size = 0;
74+
while (is.read() != -1) {
75+
size++;
76+
}
77+
is.close();
78+
79+
// Make sure all data has been read
80+
assertThat(size, is(NUMBER_OF_BUFS * SIZE));
81+
}
82+
83+
@Path("/download")
84+
public static class DownloadResource {
85+
86+
@GET
87+
@Produces(MediaType.MULTIPART_FORM_DATA)
88+
public Response getPayload(
89+
@NotNull @QueryParam("fileName") String fileName) {
90+
StreamingOutput fileStream = output -> {
91+
92+
// 2gb
93+
for (int i = 0; i < NUMBER_OF_BUFS; i++) {
94+
output.write(DATA_10MB);
95+
output.flush();
96+
}
97+
98+
};
99+
return Response
100+
.ok(fileStream, MediaType.MULTIPART_FORM_DATA)
101+
.build();
102+
}
103+
}
104+
}

webserver/test-support/src/main/java/io/helidon/webserver/testsupport/TestClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
2+
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
import io.helidon.common.reactive.Single;
4040
import io.helidon.media.common.MediaContext;
4141
import io.helidon.media.common.MediaSupport;
42+
import io.helidon.webserver.BackpressureStrategy;
4243
import io.helidon.webserver.BareRequest;
4344
import io.helidon.webserver.BareResponse;
4445
import io.helidon.webserver.Routing;
@@ -294,6 +295,11 @@ public Single<BareResponse> whenCompleted() {
294295
return Single.create(completionStage);
295296
}
296297

298+
@Override
299+
public void backpressureStrategy(BackpressureStrategy backpressureStrategy) {
300+
//noop
301+
}
302+
297303
@Override
298304
public void onSubscribe(Flow.Subscription subscription) {
299305
this.subscription = subscription;

webserver/webserver/pom.xml

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,30 @@
9191
<artifactId>maven-failsafe-plugin</artifactId>
9292
<executions>
9393
<execution>
94+
<id>tck-test</id>
9495
<goals>
9596
<goal>integration-test</goal>
9697
<goal>verify</goal>
9798
</goals>
99+
<configuration>
100+
<includes>
101+
<include>**/*TckTest.java</include>
102+
</includes>
103+
</configuration>
104+
</execution>
105+
<execution>
106+
<id>integration-test</id>
107+
<goals>
108+
<goal>integration-test</goal>
109+
<goal>verify</goal>
110+
</goals>
111+
<configuration>
112+
<includes>
113+
<include>**/*IT.java</include>
114+
</includes>
115+
</configuration>
98116
</execution>
99117
</executions>
100-
<configuration>
101-
<includes>
102-
<include>**/*TckTest.java</include>
103-
</includes>
104-
</configuration>
105118
<dependencies>
106119
<dependency>
107120
<groupId>org.apache.maven.surefire</groupId>
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) 2022 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.webserver;
18+
19+
import java.util.concurrent.Flow;
20+
21+
import io.helidon.webserver.ServerResponseSubscription.Unbounded;
22+
import io.helidon.webserver.ServerResponseSubscription.WatermarkAutoFlush;
23+
import io.helidon.webserver.ServerResponseSubscription.WatermarkLinear;
24+
import io.helidon.webserver.ServerResponseSubscription.WatermarkPrefetch;
25+
26+
/**
27+
* Strategy for applying backpressure to the reactive stream of response data.
28+
*/
29+
public enum BackpressureStrategy {
30+
/**
31+
* Data chunks are requested one-by-one after previous data chunk has been given to Netty for writing.
32+
* When backpressure-buffer-size watermark is reached new chunks are not requested until buffer size
33+
* decrease under the watermark value.
34+
*/
35+
LINEAR(1),
36+
/**
37+
* Data are requested one-by-one, in case buffer reaches watermark,
38+
* no other data is requested and extra flush is initiated.
39+
*/
40+
AUTO_FLUSH(2),
41+
/**
42+
* After first data chunk arrives, expected number of chunks needed
43+
* to fill the buffer up to watermark is calculated and requested.
44+
*/
45+
PREFETCH(3),
46+
/**
47+
* No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
48+
*/
49+
UNBOUNDED(4);
50+
51+
private final int type;
52+
53+
BackpressureStrategy(int type) {
54+
this.type = type;
55+
}
56+
57+
ServerResponseSubscription createSubscription(Flow.Subscription subscription,
58+
long backpressureBufferSize) {
59+
switch (type) {
60+
case 1: return new WatermarkLinear(subscription, backpressureBufferSize);
61+
case 2: return new WatermarkAutoFlush(subscription, backpressureBufferSize);
62+
case 3: return new WatermarkPrefetch(subscription, backpressureBufferSize);
63+
case 4: return new Unbounded(subscription);
64+
default: throw new IllegalStateException("Unknown backpressure strategy.");
65+
}
66+
}
67+
}

webserver/webserver/src/main/java/io/helidon/webserver/BareResponse.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
2+
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,13 @@ void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<String>>
5858
*/
5959
Single<BareResponse> whenCompleted();
6060

61+
/**
62+
* Set the backpressure strategy used for requesting response data.
63+
*
64+
* @param backpressureStrategy strategy used for requesting response data
65+
*/
66+
void backpressureStrategy(BackpressureStrategy backpressureStrategy);
67+
6168
/**
6269
* Each response is subscribed up to a single publisher and AFTER {@link #writeStatusAndHeaders(Http.ResponseStatus, Map)}
6370
* method is called and returned.

0 commit comments

Comments
 (0)