Skip to content

Commit 72a9299

Browse files
Update streaming example to use IoMulti to demonstrate proper NIO usage. (helidon-io#6604)
Fixes helidon-io#2844
1 parent 1381dd5 commit 72a9299

3 files changed

Lines changed: 46 additions & 178 deletions

File tree

examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
2+
* Copyright (c) 2018, 2023 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,21 @@
1616

1717
package io.helidon.webserver.examples.streaming;
1818

19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
1921
import java.net.URISyntaxException;
22+
import java.net.URL;
23+
import java.nio.channels.ReadableByteChannel;
24+
import java.nio.file.Files;
2025
import java.nio.file.Path;
2126
import java.nio.file.Paths;
27+
import java.util.Arrays;
28+
import java.util.concurrent.ScheduledExecutorService;
2229
import java.util.logging.Logger;
2330

31+
import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
32+
import io.helidon.common.http.DataChunk;
33+
import io.helidon.common.reactive.IoMulti;
2434
import io.helidon.webserver.Routing;
2535
import io.helidon.webserver.ServerRequest;
2636
import io.helidon.webserver.ServerResponse;
@@ -34,12 +44,16 @@
3444
*/
3545
public class StreamingService implements Service {
3646
private static final Logger LOGGER = Logger.getLogger(StreamingService.class.getName());
37-
47+
private final ScheduledExecutorService executor = ScheduledThreadPoolSupplier.create().get();
3848
private final Path filePath;
3949

4050
StreamingService() {
51+
URL resource = getClass().getResource(LARGE_FILE_RESOURCE);
52+
if (resource == null) {
53+
throw new IllegalStateException("Resource not found: " + LARGE_FILE_RESOURCE);
54+
}
4155
try {
42-
filePath = Paths.get(getClass().getResource(LARGE_FILE_RESOURCE).toURI());
56+
filePath = Paths.get(resource.toURI());
4357
} catch (URISyntaxException e) {
4458
throw new RuntimeException(e);
4559
}
@@ -48,20 +62,45 @@ public class StreamingService implements Service {
4862
@Override
4963
public void update(Routing.Rules routingRules) {
5064
routingRules.get("/download", this::download)
51-
.post("/upload", this::upload);
65+
.post("/upload", this::upload);
5266
}
5367

5468
private void upload(ServerRequest request, ServerResponse response) {
5569
LOGGER.info("Entering upload ... " + Thread.currentThread());
56-
request.content().subscribe(new ServerFileWriter(response));
70+
Path tempFilePath = createTempFile("large-file", ".tmp");
71+
request.content()
72+
.map(DataChunk::data)
73+
.flatMapIterable(Arrays::asList)
74+
.to(IoMulti.writeToFile(tempFilePath)
75+
.executor(executor)
76+
.build());
5777
LOGGER.info("Exiting upload ...");
5878
}
5979

6080
private void download(ServerRequest request, ServerResponse response) {
6181
LOGGER.info("Entering download ..." + Thread.currentThread());
6282
long length = filePath.toFile().length();
63-
response.headers().add("Content-Length", String.valueOf(length));
64-
response.send(new ServerFileReader(filePath));
83+
response.headers().contentLength(length);
84+
response.send(IoMulti.multiFromByteChannelBuilder(newByteChannel(filePath))
85+
.executor(executor)
86+
.build());
6587
LOGGER.info("Exiting download ...");
6688
}
89+
90+
@SuppressWarnings("SameParameterValue")
91+
private static Path createTempFile(String prefix, String suffix) {
92+
try {
93+
return Files.createTempFile(prefix, suffix);
94+
} catch (IOException ex) {
95+
throw new UncheckedIOException(ex);
96+
}
97+
}
98+
99+
private static ReadableByteChannel newByteChannel(Path path) {
100+
try {
101+
return Files.newByteChannel(path);
102+
} catch (IOException ex) {
103+
throw new UncheckedIOException(ex);
104+
}
105+
}
67106
}

0 commit comments

Comments
 (0)