Skip to content

Commit c976f85

Browse files
authored
4.x: Introducing request and response stream filters to server. (helidon-io#7608)
* Introducing request and response stream filters to server. * Fixed tracing propagation test, now has 3 spans.
1 parent ef9eba3 commit c976f85

18 files changed

Lines changed: 281 additions & 51 deletions

File tree

http/encoding/deflate/src/main/java/io/helidon/http/encoding/deflate/DeflateEncoding.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public ContentDecoder decoder() {
8787
public ContentEncoder encoder() {
8888
return new ContentEncoder() {
8989
@Override
90-
public OutputStream encode(OutputStream network) {
90+
public OutputStream apply(OutputStream network) {
9191
return new DeflaterOutputStream(network);
9292
}
9393

http/encoding/encoding/src/main/java/io/helidon/http/encoding/ContentDecoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
package io.helidon.http.encoding;
1818

1919
import java.io.InputStream;
20-
import java.util.function.Function;
20+
import java.util.function.UnaryOperator;
2121

2222
/**
2323
* Content decoder.
2424
*/
2525
@FunctionalInterface
26-
public interface ContentDecoder extends Function<InputStream, InputStream> {
26+
public interface ContentDecoder extends UnaryOperator<InputStream> {
2727
/**
2828
* No op content decoder.
2929
*/

http/encoding/encoding/src/main/java/io/helidon/http/encoding/ContentEncoder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
package io.helidon.http.encoding;
1818

1919
import java.io.OutputStream;
20+
import java.util.function.UnaryOperator;
2021

2122
import io.helidon.http.WritableHeaders;
2223

2324
/**
2425
* Content encoder.
2526
*/
26-
public interface ContentEncoder {
27+
public interface ContentEncoder extends UnaryOperator<OutputStream> {
2728
/**
2829
* No-op content encoder.
2930
*/
@@ -35,7 +36,8 @@ public interface ContentEncoder {
3536
* @param network output stream to be written over the network
3637
* @return output stream to write plain data (to compress, encrypt)
3738
*/
38-
OutputStream encode(OutputStream network);
39+
@Override
40+
OutputStream apply(OutputStream network);
3941

4042
/**
4143
* Update headers.

http/encoding/gzip/src/main/java/io/helidon/http/encoding/gzip/GzipEncoding.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public ContentDecoder decoder() {
8787
public ContentEncoder encoder() {
8888
return new ContentEncoder() {
8989
@Override
90-
public OutputStream encode(OutputStream network) {
90+
public OutputStream apply(OutputStream network) {
9191
try {
9292
return new GZIPOutputStream(network);
9393
} catch (IOException e) {

tracing/tests/it-tracing-client-zipkin/src/test/java/io/helidon/tracing/tests/it1/OpenTraceableClientE2ETest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
2828

29-
import io.helidon.webserver.testing.junit5.ServerTest;
30-
import io.helidon.webserver.testing.junit5.SetUpServer;
31-
import io.helidon.webserver.WebServer;
32-
import io.helidon.webserver.WebServerConfig;
33-
import io.helidon.webserver.tracing.TracingFeature;
3429
import io.helidon.tracing.Span;
3530
import io.helidon.tracing.jersey.client.ClientTracingFilter;
3631
import io.helidon.tracing.providers.opentracing.OpenTracing;
3732
import io.helidon.tracing.providers.zipkin.ZipkinTracer;
33+
import io.helidon.webserver.WebServer;
34+
import io.helidon.webserver.WebServerConfig;
35+
import io.helidon.webserver.testing.junit5.ServerTest;
36+
import io.helidon.webserver.testing.junit5.SetUpServer;
37+
import io.helidon.webserver.tracing.TracingFeature;
3838

3939
import brave.Tracing;
4040
import brave.opentracing.BraveSpanContext;
@@ -47,7 +47,6 @@
4747
import org.glassfish.jersey.client.ClientConfig;
4848
import org.junit.jupiter.api.AfterAll;
4949
import org.junit.jupiter.api.BeforeEach;
50-
import org.junit.jupiter.api.Disabled;
5150
import org.junit.jupiter.api.Test;
5251

5352
import static org.hamcrest.CoreMatchers.is;
@@ -68,7 +67,6 @@
6867
*
6968
*/
7069
@ServerTest
71-
@Disabled("https://github.com/helidon-io/helidon/issues/7203")
7270
class OpenTraceableClientE2ETest {
7371
/**
7472
* We expect two client spans and two server spans.
@@ -129,7 +127,9 @@ void e2e() throws Exception {
129127
fail("Timed out waiting to detect expected "
130128
+ EXPECTED_TRACE_EVENTS_COUNT
131129
+ "; remaining latch count: "
132-
+ eventsLatch.getCount());
130+
+ eventsLatch.getCount()
131+
+ ", server spans: " + printSpans(SERVER_SPANS)
132+
+ ", client spans: " + printSpans(CLIENT_SPANS));
133133
}
134134

135135
assertThat("Client spans reported. Client: " + printSpans(CLIENT_SPANS) + ", Server: " + printSpans(SERVER_SPANS),

scheduling/src/test/resources/logging.properties renamed to tracing/tests/it-tracing-client-zipkin/src/test/resources/logging.properties

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
2+
# Copyright (c) 2023 Oracle and/or its affiliates.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -14,19 +14,9 @@
1414
# limitations under the License.
1515
#
1616

17-
# Example Logging Configuration File
18-
# For more information see $JAVA_HOME/jre/lib/logging.properties
19-
20-
# Send messages to the console
2117
handlers=io.helidon.logging.jul.HelidonConsoleHandler
22-
23-
# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
2418
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
2519

26-
# Global logging level. Can be overridden by specific loggers
2720
.level=WARNING
2821

2922
io.helidon.level=INFO
30-
io.helidon.faulttolerance.level=INFO
31-
32-

webclient/tests/webclient/src/test/java/io/helidon/webclient/tests/TracingPropagationTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class TracingPropagationTest {
6464
private final URI uri;
6565

6666
TracingPropagationTest(URI uri) {
67-
Tracer tracer = OpenTracing.create(this.tracer);
67+
Tracer tracer = OpenTracing.create(TracingPropagationTest.tracer);
6868
this.uri = uri.resolve("/greet");
6969
this.client = Http1Client.builder()
7070
.baseUri(this.uri)
@@ -111,7 +111,8 @@ void testTracingSuccess() throws InterruptedException {
111111

112112
// the server traces asynchronously, some spans may be written after we receive the response.
113113
// we need to try to wait for such spans
114-
assertThat("There should be 2 spans reported", tracer.finishedSpans(), hasSize(2));
114+
// re-introduced content-write span
115+
assertThat("There should be 3 spans reported", tracer.finishedSpans(), hasSize(3));
115116

116117

117118
// we need the first span - parentId 0

webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerRequest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package io.helidon.webserver.http2;
1818

19+
import java.io.InputStream;
20+
import java.util.Objects;
1921
import java.util.function.Supplier;
22+
import java.util.function.UnaryOperator;
2023

2124
import io.helidon.common.LazyValue;
2225
import io.helidon.common.buffers.BufferData;
@@ -60,6 +63,7 @@ class Http2ServerRequest implements RoutingRequest {
6063
private Context context;
6164
// preparation for continue support in HTTP/2
6265
private boolean continueSent;
66+
private UnaryOperator<InputStream> streamFilter = UnaryOperator.identity();
6367

6468
Http2ServerRequest(ConnectionContext ctx,
6569
HttpSecurity security,
@@ -76,7 +80,8 @@ class Http2ServerRequest implements RoutingRequest {
7680
this.requestId = requestId;
7781
this.authority = headers.authority();
7882

79-
this.entity = LazyValue.create(() -> Http2ServerRequestEntity.create(decoder,
83+
this.entity = LazyValue.create(() -> Http2ServerRequestEntity.create(streamFilter,
84+
decoder,
8085
it -> entitySupplier.get(),
8186
NO_OP_RUNNABLE,
8287
this.headers,
@@ -208,6 +213,13 @@ public boolean continueSent() {
208213
return continueSent;
209214
}
210215

216+
@Override
217+
public void streamFilter(UnaryOperator<InputStream> filterFunction) {
218+
Objects.requireNonNull(filterFunction);
219+
UnaryOperator<InputStream> current = this.streamFilter;
220+
this.streamFilter = it -> filterFunction.apply(current.apply(it));
221+
}
222+
211223
private UriInfo createUriInfo() {
212224
return ctx.listenerContext().config().requestedUriDiscoveryContext().uriInfo(remotePeer().address().toString(),
213225
localPeer().address().toString(),

webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerRequestEntity.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import java.io.InputStream;
2020
import java.util.function.Function;
21+
import java.util.function.UnaryOperator;
2122

2223
import io.helidon.common.GenericType;
2324
import io.helidon.common.buffers.BufferData;
2425
import io.helidon.http.ServerRequestHeaders;
25-
import io.helidon.http.encoding.ContentDecoder;
2626
import io.helidon.http.media.MediaContext;
2727
import io.helidon.http.media.ReadableEntity;
2828
import io.helidon.http.media.ReadableEntityBase;
@@ -33,15 +33,18 @@
3333
public final class Http2ServerRequestEntity extends ReadableEntityBase implements ReadableEntity {
3434
private final ServerRequestHeaders requestHeaders;
3535
private final MediaContext mediaContext;
36-
private final Function<InputStream, InputStream> decoder;
36+
private final UnaryOperator<InputStream> decoder;
37+
private final UnaryOperator<InputStream> streamFilter;
3738

38-
private Http2ServerRequestEntity(Function<InputStream, InputStream> decoder,
39+
private Http2ServerRequestEntity(UnaryOperator<InputStream> streamFilter,
40+
UnaryOperator<InputStream> decoder,
3941
Function<Integer, BufferData> readEntityFunction,
4042
Runnable entityProcessedRunnable,
4143
ServerRequestHeaders requestHeaders,
4244
MediaContext mediaContext) {
4345
super(readEntityFunction, entityProcessedRunnable);
4446

47+
this.streamFilter = streamFilter;
4548
this.decoder = decoder;
4649
this.requestHeaders = requestHeaders;
4750
this.mediaContext = mediaContext;
@@ -50,6 +53,7 @@ private Http2ServerRequestEntity(Function<InputStream, InputStream> decoder,
5053
/**
5154
* Create a new entity.
5255
*
56+
* @param streamFilter stream filter to apply to the stream, provided by user
5357
* @param decoder content decoder
5458
* @param readEntityFunction function to read buffer from entity (int is an estimated number of bytes needed, buffer
5559
* will contain at least 1 byte)
@@ -58,17 +62,24 @@ private Http2ServerRequestEntity(Function<InputStream, InputStream> decoder,
5862
* @param mediaContext media context to map to correct types
5963
* @return a new entity
6064
*/
61-
public static Http2ServerRequestEntity create(ContentDecoder decoder,
65+
public static Http2ServerRequestEntity create(UnaryOperator<InputStream> streamFilter,
66+
UnaryOperator<InputStream> decoder,
6267
Function<Integer, BufferData> readEntityFunction,
6368
Runnable entityProcessedRunnable,
6469
ServerRequestHeaders requestHeaders,
6570
MediaContext mediaContext) {
66-
return new Http2ServerRequestEntity(decoder, readEntityFunction, entityProcessedRunnable, requestHeaders, mediaContext);
71+
return new Http2ServerRequestEntity(streamFilter,
72+
decoder,
73+
readEntityFunction,
74+
entityProcessedRunnable,
75+
requestHeaders,
76+
mediaContext);
6777
}
6878

6979
@Override
7080
public ReadableEntity copy(Runnable entityProcessedRunnable) {
71-
return new Http2ServerRequestEntity(decoder,
81+
return new Http2ServerRequestEntity(streamFilter,
82+
decoder,
7283
readEntityFunction(), () -> {
7384
entityProcessedRunnable.run();
7485
entityProcessedRunnable().run();
@@ -79,7 +90,7 @@ public ReadableEntity copy(Runnable entityProcessedRunnable) {
7990

8091
@Override
8192
public InputStream inputStream() {
82-
return decoder.apply(super.inputStream());
93+
return streamFilter.apply(decoder.apply(super.inputStream()));
8394
}
8495

8596
@Override

webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.io.IOException;
2020
import java.io.OutputStream;
2121
import java.io.UncheckedIOException;
22+
import java.util.Objects;
23+
import java.util.function.UnaryOperator;
2224

2325
import io.helidon.common.buffers.BufferData;
2426
import io.helidon.http.DateTime;
@@ -51,6 +53,7 @@ class Http2ServerResponse extends ServerResponseBase<Http2ServerResponse> {
5153
private boolean streamingEntity;
5254
private long bytesWritten;
5355
private BlockingOutputStream outputStream;
56+
private UnaryOperator<OutputStream> outputStreamFilter;
5457

5558
Http2ServerResponse(ConnectionContext ctx,
5659
Http2ServerRequest request,
@@ -73,6 +76,16 @@ public Http2ServerResponse header(Header header) {
7376

7477
@Override
7578
public void send(byte[] entityBytes) {
79+
if (outputStreamFilter != null) {
80+
// in this case we must honor user's request to filter the stream
81+
try (OutputStream os = outputStream()) {
82+
os.write(entityBytes);
83+
} catch (IOException e) {
84+
throw new UncheckedIOException(e);
85+
}
86+
return;
87+
}
88+
7689
if (isSent) {
7790
throw new IllegalStateException("Response already sent");
7891
}
@@ -132,7 +145,11 @@ public OutputStream outputStream() {
132145
this.isSent = true;
133146
afterSend();
134147
});
135-
return contentEncode(outputStream);
148+
if (outputStreamFilter == null) {
149+
return contentEncode(outputStream);
150+
} else {
151+
return outputStreamFilter.apply(contentEncode(outputStream));
152+
}
136153
}
137154

138155
@Override
@@ -173,6 +190,24 @@ public void commit() {
173190
}
174191
}
175192

193+
@Override
194+
public void streamFilter(UnaryOperator<OutputStream> filterFunction) {
195+
if (isSent) {
196+
throw new IllegalStateException("Response already sent");
197+
}
198+
if (streamingEntity) {
199+
throw new IllegalStateException("OutputStream already obtained");
200+
}
201+
Objects.requireNonNull(filterFunction);
202+
203+
UnaryOperator<OutputStream> current = this.outputStreamFilter;
204+
if (current == null) {
205+
this.outputStreamFilter = filterFunction;
206+
} else {
207+
this.outputStreamFilter = it -> filterFunction.apply(current.apply(it));
208+
}
209+
}
210+
176211
private static class BlockingOutputStream extends OutputStream {
177212

178213
private final ServerResponseHeaders headers;

0 commit comments

Comments
 (0)