Skip to content

Commit 8e47985

Browse files
authored
[MPRS] Route cancellation to a background thread for the TCK's sake (helidon-io#1608)
* [MPRS] Route cancellation to a background thread for the TCK's sake * Fix style issue
1 parent d7a4651 commit 8e47985

2 files changed

Lines changed: 112 additions & 6 deletions

File tree

microprofile/reactive-streams/src/main/java/io/helidon/microprofile/reactive/HelidonReactiveStreamsEngine.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -513,29 +513,27 @@ static <T, R> Flow.Processor<T, R> coupledBuildProcessor(Flow.Subscriber<? super
513513
inlet
514514
.onComplete(() -> complete(subscriberActivity))
515515
.onError(e -> fail(subscriberActivity, e))
516+
.compose(upstream -> new MultiCancelOnExecutor<>(upstream, coupledExecutor))
516517
.takeUntil(Multi.from(publisherActivity, true))
517518
.onCancel(() -> complete(subscriberActivity))
518519
.subscribe(subscriber);
519520

520521
Multi<? extends R> outlet = Multi.from(publisher)
521522
.onComplete(() -> complete(publisherActivity))
522523
.onError(e -> fail(publisherActivity, e))
524+
.compose(upstream -> new MultiCancelOnExecutor<>(upstream, coupledExecutor))
523525
.takeUntil(Multi.from(subscriberActivity, true))
524526
.onCancel(() -> complete(publisherActivity));
525527

526528
return new BridgeProcessor<>(inlet, outlet);
527529
}
528530

529531
static void complete(CompletableFuture<Object> cf) {
530-
coupledExecutor.execute(() -> {
531-
cf.complete(null);
532-
});
532+
cf.complete(null);
533533
}
534534

535535
static void fail(CompletableFuture<Object> cf, Throwable ex) {
536-
coupledExecutor.execute(() -> {
537-
cf.completeExceptionally(ex);
538-
});
536+
cf.completeExceptionally(ex);
539537
}
540538

541539
// Workaround for a TCK bug when calling cancel() from any method named onComplete().
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.microprofile.reactive;
19+
20+
import java.util.concurrent.Executor;
21+
import java.util.concurrent.Flow;
22+
23+
import io.helidon.common.reactive.Multi;
24+
25+
/**
26+
* Routes the downstream cancel() call through the given executor.
27+
* @param <T> the element type of the sequence
28+
*/
29+
final class MultiCancelOnExecutor<T> implements Multi<T> {
30+
31+
private final Multi<T> source;
32+
33+
private final Executor executor;
34+
35+
MultiCancelOnExecutor(Multi<T> source, Executor executor) {
36+
this.source = source;
37+
this.executor = executor;
38+
}
39+
40+
@Override
41+
public void subscribe(Flow.Subscriber<? super T> subscriber) {
42+
source.subscribe(new CancelOnExecutorSubscriber<>(subscriber, executor));
43+
}
44+
45+
static final class CancelOnExecutorSubscriber<T>
46+
implements Flow.Subscriber<T>, Flow.Subscription, Runnable {
47+
48+
private final Flow.Subscriber<? super T> downstream;
49+
50+
private final Executor executor;
51+
52+
private volatile boolean canceled;
53+
54+
private Flow.Subscription upstream;
55+
56+
CancelOnExecutorSubscriber(Flow.Subscriber<? super T> downstream, Executor executor) {
57+
this.downstream = downstream;
58+
this.executor = executor;
59+
}
60+
61+
@Override
62+
public void run() {
63+
upstream.cancel();
64+
upstream = SubscriptionHelper.CANCELED;
65+
}
66+
67+
@Override
68+
public void onSubscribe(Flow.Subscription subscription) {
69+
SubscriptionHelper.validate(upstream, subscription);
70+
upstream = subscription;
71+
downstream.onSubscribe(this);
72+
}
73+
74+
@Override
75+
public void onNext(T item) {
76+
if (!canceled) {
77+
downstream.onNext(item);
78+
}
79+
}
80+
81+
@Override
82+
public void onError(Throwable throwable) {
83+
if (!canceled) {
84+
downstream.onError(throwable);
85+
}
86+
}
87+
88+
@Override
89+
public void onComplete() {
90+
if (!canceled) {
91+
downstream.onComplete();
92+
}
93+
}
94+
95+
@Override
96+
public void request(long n) {
97+
upstream.request(n);
98+
}
99+
100+
@Override
101+
public void cancel() {
102+
if (!canceled) {
103+
canceled = true;
104+
executor.execute(this);
105+
}
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)