Skip to content

Commit c9d9734

Browse files
authored
Multi ifEmpty (helidon-io#3470)
* Multi.ifEmpty Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
1 parent 94da9ea commit c9d9734

11 files changed

Lines changed: 449 additions & 6 deletions

File tree

common/reactive/src/main/java/io/helidon/common/reactive/Multi.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,17 @@ default Multi<T> onTerminate(Runnable onTerminate) {
848848
onTerminate);
849849
}
850850

851+
/**
852+
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
853+
*
854+
* @param ifEmpty {@link java.lang.Runnable} to be executed.
855+
* @return Multi
856+
*/
857+
default Multi<T> ifEmpty(Runnable ifEmpty) {
858+
Objects.requireNonNull(ifEmpty, "ifEmpty callback is null");
859+
return new MultiIfEmptyPublisher<>(this, ifEmpty);
860+
}
861+
851862
/**
852863
* Invoke provided consumer for every item in stream.
853864
*
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) 2021 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import java.util.Objects;
20+
import java.util.concurrent.Flow;
21+
22+
/**
23+
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
24+
*
25+
* @param <T> the item type
26+
*/
27+
final class MultiIfEmptyPublisher<T> implements Multi<T> {
28+
29+
private final Multi<T> source;
30+
private final Runnable ifEmpty;
31+
32+
MultiIfEmptyPublisher(Multi<T> source, Runnable ifEmpty) {
33+
this.source = source;
34+
this.ifEmpty = ifEmpty;
35+
}
36+
37+
@Override
38+
public void subscribe(Flow.Subscriber<? super T> subscriber) {
39+
Objects.requireNonNull(subscriber, "subscriber is null");
40+
source.subscribe(new IfEmptySubscriber<>(subscriber, ifEmpty));
41+
}
42+
43+
static final class IfEmptySubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {
44+
45+
private final Flow.Subscriber<? super T> downstream;
46+
private final Runnable ifEmpty;
47+
48+
private boolean empty;
49+
50+
private Flow.Subscription upstream;
51+
52+
IfEmptySubscriber(Flow.Subscriber<? super T> downstream, Runnable ifEmpty) {
53+
this.downstream = downstream;
54+
this.ifEmpty = ifEmpty;
55+
this.empty = true;
56+
}
57+
58+
@Override
59+
public void onSubscribe(Flow.Subscription subscription) {
60+
SubscriptionHelper.validate(upstream, subscription);
61+
upstream = subscription;
62+
downstream.onSubscribe(this);
63+
}
64+
65+
@Override
66+
public void onNext(T item) {
67+
Flow.Subscription s = upstream;
68+
if (s != SubscriptionHelper.CANCELED) {
69+
empty = false;
70+
downstream.onNext(item);
71+
}
72+
}
73+
74+
@Override
75+
public void onError(Throwable throwable) {
76+
Flow.Subscription s = upstream;
77+
if (s != SubscriptionHelper.CANCELED) {
78+
upstream = SubscriptionHelper.CANCELED;
79+
downstream.onError(throwable);
80+
}
81+
}
82+
83+
@Override
84+
public void onComplete() {
85+
Flow.Subscription s = upstream;
86+
if (s != SubscriptionHelper.CANCELED) {
87+
upstream = SubscriptionHelper.CANCELED;
88+
boolean e = empty;
89+
if (e) {
90+
try {
91+
ifEmpty.run();
92+
} catch (Throwable t) {
93+
downstream.onError(t);
94+
return;
95+
}
96+
}
97+
downstream.onComplete();
98+
}
99+
}
100+
101+
@Override
102+
public void request(long n) {
103+
upstream.request(n);
104+
}
105+
106+
@Override
107+
public void cancel() {
108+
upstream.cancel();
109+
upstream = SubscriptionHelper.CANCELED;
110+
}
111+
}
112+
}

common/reactive/src/main/java/io/helidon/common/reactive/Single.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,17 @@ default Single<T> onTerminate(Runnable onTerminate) {
517517
onTerminate);
518518
}
519519

520+
/**
521+
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
522+
*
523+
* @param ifEmpty {@link java.lang.Runnable} to be executed.
524+
* @return Multi
525+
*/
526+
default Single<T> ifEmpty(Runnable ifEmpty) {
527+
Objects.requireNonNull(ifEmpty, "ifEmpty callback is null");
528+
return new SingleIfEmptyPublisher<>(this, ifEmpty);
529+
}
530+
520531
/**
521532
* Invoke provided consumer for the item in stream.
522533
*
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2021 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import java.util.Objects;
20+
import java.util.concurrent.Flow;
21+
22+
/**
23+
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
24+
*
25+
* @param <T> the item type
26+
*/
27+
final class SingleIfEmptyPublisher<T> extends CompletionSingle<T> {
28+
29+
private final Single<T> source;
30+
private final Runnable ifEmpty;
31+
32+
SingleIfEmptyPublisher(Single<T> source, Runnable ifEmpty) {
33+
this.source = source;
34+
this.ifEmpty = ifEmpty;
35+
}
36+
37+
@Override
38+
public void subscribe(Flow.Subscriber<? super T> subscriber) {
39+
Objects.requireNonNull(subscriber, "subscriber is null");
40+
source.subscribe(new MultiIfEmptyPublisher.IfEmptySubscriber<>(subscriber, ifEmpty));
41+
}
42+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (c) 2021 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.function.Function;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import static org.hamcrest.MatcherAssert.assertThat;
28+
import static org.hamcrest.Matchers.contains;
29+
30+
public class MultiIfEmptyTest {
31+
32+
@Test
33+
void empty() {
34+
List<String> result = new ArrayList<>();
35+
Multi.<String>empty()
36+
.ifEmpty(() -> result.add("ifEmpty"))
37+
.onComplete(() -> result.add("onComplete"))
38+
.peek(result::add)
39+
.onError(t -> result.add("onError"))
40+
.ignoreElements();
41+
assertThat(result, contains("ifEmpty", "onComplete"));
42+
}
43+
44+
@Test
45+
void multipleEmpty() {
46+
List<String> result = new ArrayList<>();
47+
Single.just(Optional.<String>empty())
48+
.flatMapOptional(Function.identity())
49+
.ifEmpty(() -> result.add("ifEmptyOptional"))
50+
.flatMap(s -> Single.<String>empty())
51+
.ifEmpty(() -> result.add("ifEmpty"))
52+
.onComplete(() -> result.add("onComplete"))
53+
.peek(result::add)
54+
.onError(t -> result.add("onError"))
55+
.ignoreElements();
56+
assertThat(result, contains("ifEmptyOptional", "ifEmpty", "onComplete"));
57+
}
58+
59+
@Test
60+
void nonEmpty() {
61+
List<String> result = new ArrayList<>();
62+
Multi.just(1, 2, 3)
63+
.map(String::valueOf)
64+
.peek(result::add)
65+
.ifEmpty(() -> result.add("ifEmpty"))
66+
.onComplete(() -> result.add("onComplete"))
67+
.onError(t -> result.add("onError"))
68+
.ignoreElements();
69+
assertThat(result, contains("1", "2", "3", "onComplete"));
70+
}
71+
72+
@Test
73+
void error() {
74+
List<String> result = new ArrayList<>();
75+
Multi.just(1, 2, 3)
76+
.flatMap(i -> {
77+
if (i == 3) {
78+
return Single.error(new Exception("BOOM!"));
79+
} else {
80+
return Single.just(String.valueOf(i));
81+
}
82+
})
83+
.peek(result::add)
84+
.ifEmpty(() -> result.add("ifEmpty"))
85+
.onComplete(() -> result.add("onComplete"))
86+
.onError(t -> result.add("onError"))
87+
.ignoreElements();
88+
assertThat(result, contains("1", "2", "onError"));
89+
}
90+
91+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (c) 2021 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.function.Function;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import static org.hamcrest.MatcherAssert.assertThat;
28+
import static org.hamcrest.Matchers.contains;
29+
30+
public class SingleIfEmptyTest {
31+
32+
@Test
33+
void empty() {
34+
List<String> result = new ArrayList<>();
35+
Single.<String>empty()
36+
.ifEmpty(() -> result.add("ifEmpty"))
37+
.onComplete(() -> result.add("onComplete"))
38+
.peek(result::add)
39+
.onError(t -> result.add("onError"))
40+
.ignoreElement();
41+
assertThat(result, contains("ifEmpty", "onComplete"));
42+
}
43+
44+
@Test
45+
void multipleEmpty() {
46+
List<String> result = new ArrayList<>();
47+
Single.just(Optional.<String>empty())
48+
.flatMapOptional(Function.identity())
49+
.ifEmpty(() -> result.add("ifEmptyOptional"))
50+
.flatMapSingle(s -> Single.<String>empty())
51+
.ifEmpty(() -> result.add("ifEmpty"))
52+
.onComplete(() -> result.add("onComplete"))
53+
.peek(result::add)
54+
.onError(t -> result.add("onError"))
55+
.ignoreElement();
56+
assertThat(result, contains("ifEmptyOptional", "ifEmpty", "onComplete"));
57+
}
58+
59+
@Test
60+
void nonEmpty() {
61+
List<String> result = new ArrayList<>();
62+
Single.just(1)
63+
.map(String::valueOf)
64+
.peek(result::add)
65+
.ifEmpty(() -> result.add("ifEmpty"))
66+
.onComplete(() -> result.add("onComplete"))
67+
.onError(t -> result.add("onError"))
68+
.ignoreElement();
69+
assertThat(result, contains("1", "onComplete"));
70+
}
71+
72+
@Test
73+
void error() {
74+
List<String> result = new ArrayList<>();
75+
Single.<String>error(new Exception("BOOM!"))
76+
.peek(result::add)
77+
.ifEmpty(() -> result.add("ifEmpty"))
78+
.onComplete(() -> result.add("onComplete"))
79+
.onError(t -> result.add("onError"))
80+
.ignoreElement();
81+
assertThat(result, contains("onError"));
82+
}
83+
84+
}

docs/shared/reactivestreams/02_engine.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ Single.just("1")
104104
|never|Get a `Multi` instance that never completes.
105105
|concat|Concat streams to one.
106106
|onTerminate|Executes given `java.lang.Runnable` when any of signals onComplete, onCancel or onError is received.
107+
|ifEmpty|Executes given `java.lang.Runnable` when stream is finished without value(empty stream).
107108
|onComplete|Executes given `java.lang.Runnable` when onComplete signal is received.
108109
|onError|Executes the given java.util.function.Consumer when an onError signal is received.
109110
|onCancel|Executes given `java.lang.Runnable` when a cancel signal is received.

docs/shared/reactivestreams/03_rsoperators.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ ReactiveStreams.of("1", "2", "3", "4", "5")
8282
|onErrorResumeWithRsPublisher| When onError signal received continue emitting from supplied publisher
8383
|onComplete| Invoke supplied runnable when onComplete signal received
8484
|onTerminate| Invoke supplied runnable when onComplete or onError signal received
85+
|ifEmpty| Executes given `java.lang.Runnable` when stream is finished without value(empty stream).
8586
|to| Connect this stream to supplied subscriber
8687
|toList| Collect all intercepted items to List
8788
|collect| Collect all intercepted items with provided collector

0 commit comments

Comments
 (0)