Skip to content

Commit 28cb3e8

Browse files
authored
[Reactive] Reimplement Concat with varargs (helidon-io#1815)
1 parent c9cc6f0 commit 28cb3e8

6 files changed

Lines changed: 270 additions & 15 deletions

File tree

common/reactive/src/main/java/io/helidon/common/reactive/Multi.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.helidon.common.reactive;
1717

1818
import java.util.ArrayList;
19-
import java.util.Arrays;
2019
import java.util.Collection;
2120
import java.util.List;
2221
import java.util.Objects;
@@ -64,25 +63,21 @@ static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secon
6463
}
6564

6665
/**
67-
* Concat streams to one.
68-
*
69-
* @param firstMulti first stream
70-
* @param secondMulti second stream
66+
* Concatenates an array of source {@link Flow.Publisher}s by relaying items
67+
* in order, non-overlappingly, one after the other finishes.
7168
* @param publishers more publishers to concat
7269
* @param <T> item type
7370
* @return Multi
7471
*/
7572
@SafeVarargs
7673
@SuppressWarnings("varargs")
77-
static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secondMulti, Flow.Publisher<T>... publishers) {
74+
static <T> Multi<T> concatArray(Flow.Publisher<T>... publishers) {
7875
if (publishers.length == 0) {
79-
return concat(firstMulti, secondMulti);
76+
return empty();
8077
} else if (publishers.length == 1) {
81-
return concat(concat(firstMulti, secondMulti), publishers[0]);
82-
} else {
83-
return concat(concat(firstMulti, secondMulti), publishers[0],
84-
Arrays.copyOfRange(publishers, 1, publishers.length));
78+
return Multi.from(publishers[0]);
8579
}
80+
return new MultiConcatArray<>(publishers);
8681
}
8782

8883
/**
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.concurrent.Flow;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
/**
24+
* Relay items in order from subsequent Flow.Publishers as a single Multi source.
25+
*/
26+
final class MultiConcatArray<T> implements Multi<T> {
27+
28+
private final Flow.Publisher<T>[] sources;
29+
30+
MultiConcatArray(Flow.Publisher<T>[] sources) {
31+
this.sources = sources;
32+
}
33+
34+
@Override
35+
public void subscribe(Flow.Subscriber<? super T> subscriber) {
36+
ConcatArraySubscriber<T> parent = new ConcatArraySubscriber<>(subscriber, sources);
37+
subscriber.onSubscribe(parent);
38+
parent.nextSource();
39+
}
40+
41+
static final class ConcatArraySubscriber<T> extends SubscriptionArbiter
42+
implements Flow.Subscriber<T> {
43+
44+
private final Flow.Subscriber<? super T> downstream;
45+
46+
private final Flow.Publisher<T>[] sources;
47+
48+
private final AtomicInteger wip;
49+
50+
private int index;
51+
52+
private long produced;
53+
54+
ConcatArraySubscriber(Flow.Subscriber<? super T> downstream, Flow.Publisher<T>[] sources) {
55+
this.downstream = downstream;
56+
this.sources = sources;
57+
this.wip = new AtomicInteger();
58+
}
59+
60+
@Override
61+
public void onSubscribe(Flow.Subscription subscription) {
62+
super.setSubscription(subscription);
63+
}
64+
65+
@Override
66+
public void onNext(T item) {
67+
produced++;
68+
downstream.onNext(item);
69+
}
70+
71+
@Override
72+
public void onError(Throwable throwable) {
73+
downstream.onError(throwable);
74+
}
75+
76+
@Override
77+
public void onComplete() {
78+
long produced = this.produced;
79+
if (produced != 0L) {
80+
this.produced = 0L;
81+
super.produced(produced);
82+
}
83+
nextSource();
84+
}
85+
86+
public void nextSource() {
87+
if (wip.getAndIncrement() == 0) {
88+
do {
89+
if (index == sources.length) {
90+
downstream.onComplete();
91+
} else {
92+
sources[index++].subscribe(this);
93+
}
94+
} while (wip.decrementAndGet() != 0);
95+
}
96+
}
97+
98+
@Override
99+
public void request(long n) {
100+
if (n <= 0) {
101+
downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
102+
} else {
103+
super.request(n);
104+
}
105+
}
106+
}
107+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import org.reactivestreams.tck.TestEnvironment;
20+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
21+
import org.testng.annotations.Test;
22+
23+
import java.util.Arrays;
24+
import java.util.concurrent.Flow;
25+
26+
@Test
27+
public class MultiConcatArrayTck1Test extends FlowPublisherVerification<Integer> {
28+
29+
public MultiConcatArrayTck1Test() {
30+
super(new TestEnvironment(200));
31+
}
32+
33+
@Override
34+
public Flow.Publisher<Integer> createFlowPublisher(long l) {
35+
return Multi.concatArray(Multi.range(0, (int) l / 2), Multi.range((int)l / 2, (int) (l - l / 2)));
36+
}
37+
38+
@Override
39+
public Flow.Publisher<Integer> createFailedFlowPublisher() {
40+
return null;
41+
}
42+
43+
@Override
44+
public long maxElementsFromPublisher() {
45+
return 10;
46+
}
47+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import org.reactivestreams.tck.TestEnvironment;
20+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
21+
import org.testng.annotations.Test;
22+
23+
import java.util.Arrays;
24+
import java.util.concurrent.Flow;
25+
26+
@Test
27+
public class MultiConcatArrayTck2Test extends FlowPublisherVerification<Integer> {
28+
29+
public MultiConcatArrayTck2Test() {
30+
super(new TestEnvironment(200));
31+
}
32+
33+
@Override
34+
public Flow.Publisher<Integer> createFlowPublisher(long l) {
35+
@SuppressWarnings("unchecked")
36+
Multi<Integer>[] sources = new Multi[(int)l];
37+
for (int i = 0; i < l; i++) {
38+
sources[i] = Multi.singleton(i);
39+
}
40+
return Multi.concatArray(sources);
41+
}
42+
43+
@Override
44+
public Flow.Publisher<Integer> createFailedFlowPublisher() {
45+
return null;
46+
}
47+
48+
@Override
49+
public long maxElementsFromPublisher() {
50+
return 10;
51+
}
52+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.helidon.common.reactive;
17+
18+
import org.junit.jupiter.api.Test;
19+
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.core.Is.is;
25+
import static org.hamcrest.core.IsInstanceOf.instanceOf;
26+
27+
public class MultiConcatArrayTest {
28+
29+
@Test
30+
public void errors() {
31+
TestSubscriber<Object> ts = new TestSubscriber<>(Long.MAX_VALUE);
32+
33+
Multi.concatArray(Multi.singleton(1), Multi.error(new IOException()), Multi.singleton(2))
34+
.subscribe(ts);
35+
36+
ts.assertFailure(IOException.class, 1);
37+
}
38+
39+
@Test
40+
public void millionSources() {
41+
@SuppressWarnings("unchecked")
42+
Multi<Integer>[] sources = new Multi[1_000_000];
43+
Arrays.fill(sources, Multi.singleton(1));
44+
45+
TestSubscriber<Object> ts = new TestSubscriber<>(Long.MAX_VALUE);
46+
47+
Multi.concatArray(sources)
48+
.subscribe(ts);
49+
50+
ts.assertItemCount(1_000_000)
51+
.assertComplete();
52+
}
53+
54+
}

common/reactive/src/test/java/io/helidon/common/reactive/MultiTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ void testConcatVarargs() {
530530

531531

532532
assertThat(Multi
533-
.concat(Multi.from(TEST_DATA_1),
533+
.concatArray(Multi.from(TEST_DATA_1),
534534
Multi.just(TEST_DATA_2),
535535
Multi.just(TEST_DATA_3)
536536
)
@@ -542,7 +542,7 @@ void testConcatVarargs() {
542542
)))));
543543

544544
assertThat(Multi
545-
.concat(Multi.from(TEST_DATA_1),
545+
.concatArray(Multi.from(TEST_DATA_1),
546546
Multi.just(TEST_DATA_2),
547547
Multi.just(TEST_DATA_3),
548548
Multi.just(TEST_DATA_4)
@@ -557,7 +557,7 @@ void testConcatVarargs() {
557557

558558

559559
assertThat(Multi
560-
.concat(Multi.from(TEST_DATA_1),
560+
.concatArray(Multi.from(TEST_DATA_1),
561561
Multi.just(TEST_DATA_2),
562562
Multi.just(TEST_DATA_3),
563563
Multi.just(TEST_DATA_4),
@@ -575,7 +575,7 @@ void testConcatVarargs() {
575575

576576

577577
assertThat(Multi
578-
.concat(Multi.from(TEST_DATA_1),
578+
.concatArray(Multi.from(TEST_DATA_1),
579579
Multi.just(TEST_DATA_2),
580580
Multi.just(TEST_DATA_3),
581581
Multi.just(TEST_DATA_4),

0 commit comments

Comments
 (0)