Skip to content

Commit 8fb3162

Browse files
authored
[Reactive] Implement observeOn + TCK tests (helidon-io#1546)
1 parent 5f467a4 commit 8fb3162

8 files changed

Lines changed: 757 additions & 0 deletions

File tree

common/reactive/src/main/java/io/helidon/common/reactive/Multi.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020
import java.util.List;
2121
import java.util.Objects;
22+
import java.util.concurrent.Executor;
2223
import java.util.concurrent.Flow;
2324
import java.util.concurrent.Flow.Publisher;
2425
import java.util.concurrent.Flow.Subscriber;
@@ -205,6 +206,35 @@ default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? ex
205206
return flatMapIterable(iterableMapper, 32);
206207
}
207208

209+
/**
210+
* Re-emit the upstream's signals to the downstream on the given executor's thread
211+
* using a default buffer size of 32 and errors skipping ahead of items.
212+
* @param executor the executor to signal the downstream from.
213+
* @return Multi
214+
* @throws NullPointerException if {@code executor} is {@code null}
215+
* @see #observeOn(Executor, int, boolean)
216+
*/
217+
default Multi<T> observeOn(Executor executor) {
218+
return observeOn(executor, 32, false);
219+
}
220+
221+
/**
222+
* Re-emit the upstream's signals to the downstream on the given executor's thread.
223+
* @param executor the executor to signal the downstream from.
224+
* @param bufferSize the number of items to prefetch and buffer at a time
225+
* @param delayError if {@code true}, errors are emitted after items,
226+
* if {@code false}, errors may cut ahead of items during emission
227+
* @return Multi
228+
* @throws NullPointerException if {@code executor} is {@code null}
229+
*/
230+
default Multi<T> observeOn(Executor executor, int bufferSize, boolean delayError) {
231+
Objects.requireNonNull(executor, "executor is null");
232+
if (bufferSize <= 0) {
233+
throw new IllegalArgumentException("bufferSize > 0 required");
234+
}
235+
return new MultiObserveOn<>(this, executor, bufferSize, delayError);
236+
}
237+
208238
/**
209239
* Transform item with supplied function and flatten resulting {@link Iterable} to downstream.
210240
*
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.Flow;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
import java.util.concurrent.atomic.AtomicReferenceArray;
24+
25+
/**
26+
* Signal items and terminal signals of the upstream on the given executor.
27+
* @param <T> the element type of the sequence
28+
*/
29+
final class MultiObserveOn<T> implements Multi<T> {
30+
31+
private final Multi<T> source;
32+
33+
private final Executor executor;
34+
35+
private final int bufferSize;
36+
37+
private final boolean delayError;
38+
39+
MultiObserveOn(Multi<T> source, Executor executor, int bufferSize, boolean delayError) {
40+
this.source = source;
41+
this.executor = executor;
42+
this.bufferSize = bufferSize;
43+
this.delayError = delayError;
44+
}
45+
46+
@Override
47+
public void subscribe(Flow.Subscriber<? super T> subscriber) {
48+
source.subscribe(new ObserveOnSubscriber<>(subscriber, executor, bufferSize, delayError));
49+
}
50+
51+
static int roundToPowerOfTwo(final int value) {
52+
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
53+
}
54+
55+
static final class ObserveOnSubscriber<T> extends AtomicInteger
56+
implements Flow.Subscriber<T>, Flow.Subscription, Runnable {
57+
58+
private final Flow.Subscriber<? super T> downstream;
59+
60+
private final Executor executor;
61+
62+
private final int bufferSize;
63+
64+
private final boolean delayError;
65+
66+
private final AtomicLong requested;
67+
68+
private final AtomicReferenceArray<T> queue;
69+
70+
private final AtomicLong producerIndex;
71+
72+
private final AtomicLong consumerIndex;
73+
74+
private Flow.Subscription upstream;
75+
76+
private Throwable error;
77+
private volatile boolean done;
78+
79+
private volatile boolean canceled;
80+
81+
private long emitted;
82+
private int consumed;
83+
84+
ObserveOnSubscriber(Flow.Subscriber<? super T> downstream, Executor executor,
85+
int bufferSize, boolean delayError) {
86+
this.downstream = downstream;
87+
this.executor = executor;
88+
this.bufferSize = bufferSize;
89+
this.delayError = delayError;
90+
this.requested = new AtomicLong();
91+
this.queue = new AtomicReferenceArray<>(roundToPowerOfTwo(bufferSize));
92+
this.producerIndex = new AtomicLong();
93+
this.consumerIndex = new AtomicLong();
94+
}
95+
96+
@Override
97+
public void onSubscribe(Flow.Subscription subscription) {
98+
SubscriptionHelper.validate(upstream, subscription);
99+
upstream = subscription;
100+
downstream.onSubscribe(this);
101+
subscription.request(bufferSize);
102+
}
103+
104+
@Override
105+
public void onNext(T item) {
106+
offer(item);
107+
schedule();
108+
}
109+
110+
@Override
111+
public void onError(Throwable throwable) {
112+
this.error = throwable;
113+
this.done = true;
114+
schedule();
115+
}
116+
117+
@Override
118+
public void onComplete() {
119+
this.done = true;
120+
schedule();
121+
}
122+
123+
void schedule() {
124+
if (getAndIncrement() == 0) {
125+
executor.execute(this);
126+
}
127+
}
128+
129+
@Override
130+
public void request(long n) {
131+
if (n <= 0L) {
132+
onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
133+
} else {
134+
SubscriptionHelper.addRequest(requested, n);
135+
schedule();
136+
}
137+
}
138+
139+
@Override
140+
public void cancel() {
141+
canceled = true;
142+
upstream.cancel();
143+
schedule();
144+
}
145+
146+
@Override
147+
public void run() {
148+
149+
int missed = 1;
150+
long r = requested.get();
151+
Flow.Subscriber<? super T> downstream = this.downstream;
152+
int consumed = this.consumed;
153+
long emitted = this.emitted;
154+
int limit = bufferSize - (bufferSize >> 2);
155+
156+
for (;;) {
157+
if (canceled) {
158+
clear();
159+
} else {
160+
boolean d = done;
161+
if (d && !delayError) {
162+
Throwable ex = error;
163+
if (ex != null) {
164+
canceled = true;
165+
downstream.onError(ex);
166+
continue;
167+
}
168+
}
169+
170+
boolean empty;
171+
if (r != emitted) {
172+
173+
T item = poll();
174+
175+
if (item != null) {
176+
177+
downstream.onNext(item);
178+
emitted++;
179+
if (++consumed == limit) {
180+
consumed = 0;
181+
upstream.request(limit);
182+
}
183+
continue;
184+
}
185+
empty = true;
186+
} else {
187+
empty = isEmpty();
188+
}
189+
190+
if (d && empty) {
191+
canceled = true;
192+
Throwable ex = error;
193+
if (ex != null) {
194+
downstream.onError(ex);
195+
} else {
196+
downstream.onComplete();
197+
}
198+
continue;
199+
}
200+
}
201+
202+
this.emitted = emitted;
203+
this.consumed = consumed;
204+
missed = addAndGet(-missed);
205+
if (missed == 0) {
206+
break;
207+
}
208+
r = requested.get();
209+
}
210+
}
211+
212+
void offer(T item) {
213+
AtomicReferenceArray<T> queue = this.queue;
214+
AtomicLong producerIndex = this.producerIndex;
215+
216+
long pi = producerIndex.get();
217+
int mask = queue.length() - 1;
218+
int offset = (int) pi & mask;
219+
220+
queue.lazySet(offset, item);
221+
producerIndex.lazySet(pi + 1);
222+
}
223+
224+
T poll() {
225+
AtomicReferenceArray<T> queue = this.queue;
226+
AtomicLong consumerIndex = this.consumerIndex;
227+
228+
long ci = consumerIndex.get();
229+
int mask = queue.length() - 1;
230+
int offset = (int) ci & mask;
231+
232+
T item = queue.get(offset);
233+
if (item == null) {
234+
return null;
235+
}
236+
queue.lazySet(offset, null);
237+
consumerIndex.lazySet(ci + 1);
238+
return item;
239+
}
240+
241+
boolean isEmpty() {
242+
AtomicLong producerIndex = this.producerIndex;
243+
AtomicLong consumerIndex = this.consumerIndex;
244+
245+
return producerIndex.get() == consumerIndex.get();
246+
}
247+
248+
void clear() {
249+
for (;;) {
250+
if (poll() == null) {
251+
break;
252+
}
253+
}
254+
}
255+
}
256+
}

common/reactive/src/main/java/io/helidon/common/reactive/Single.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.CompletionStage;
2222
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Executor;
2324
import java.util.concurrent.Flow;
2425
import java.util.concurrent.Flow.Publisher;
2526
import java.util.concurrent.Flow.Subscriber;
@@ -142,6 +143,17 @@ default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? ex
142143
return new SingleFlatMapIterable<>(this, mapper);
143144
}
144145

146+
/**
147+
* Re-emit the upstream's signals to the downstream on the given executor's thread.
148+
* @param executor the executor to signal the downstream from.
149+
* @return Single
150+
* @throws NullPointerException if {@code executor} is {@code null}
151+
*/
152+
default Single<T> observeOn(Executor executor) {
153+
Objects.requireNonNull(executor, "executor is null");
154+
return new SingleObserveOn<>(this, executor);
155+
}
156+
145157
/**
146158
* Exposes this {@link Single} instance as a {@link CompletionStage}.
147159
* Note that if this {@link Single} completes without a value, the resulting {@link CompletionStage} will be completed

0 commit comments

Comments
 (0)