Skip to content

Commit 163c7f0

Browse files
authored
[Reactive] Implement timer() + TCK tests (helidon-io#1516)
* [Reactive] Implement timer() + TCK tests * Add missing copyright
1 parent 828d7c6 commit 163c7f0

8 files changed

Lines changed: 548 additions & 0 deletions

File tree

common/reactive/src/main/java/io/helidon/common/reactive/Multi.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.concurrent.Flow;
2323
import java.util.concurrent.Flow.Publisher;
2424
import java.util.concurrent.Flow.Subscriber;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
2527
import java.util.function.BiConsumer;
2628
import java.util.function.BiFunction;
2729
import java.util.function.Consumer;
@@ -515,6 +517,20 @@ static Multi<Long> rangeLong(long start, long count) {
515517
return new MultiRangeLongPublisher(start, start + count);
516518
}
517519

520+
/**
521+
* Signal 0L and complete the sequence after the given time elapsed.
522+
* @param time the time to wait before signaling 0L and completion
523+
* @param unit the unit of time
524+
* @param executor the executor to run the waiting on
525+
* @return Multi
526+
* @throws NullPointerException if {@code unit} or {@code executor} is {@code null}
527+
*/
528+
static Multi<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor) {
529+
Objects.requireNonNull(unit, "unit is null");
530+
Objects.requireNonNull(executor, "executor is null");
531+
return new MultiTimer(time, unit, executor);
532+
}
533+
518534
/**
519535
* {@link java.util.function.Function} providing one item to be submitted as onNext in case of onError signal is received.
520536
*
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.Flow;
22+
import java.util.concurrent.Future;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
/**
28+
* Signal 0L and complete after the specified time.
29+
*/
30+
final class MultiTimer implements Multi<Long> {
31+
32+
private final long time;
33+
34+
private final TimeUnit unit;
35+
36+
private final ScheduledExecutorService executor;
37+
38+
MultiTimer(long time, TimeUnit unit, ScheduledExecutorService executor) {
39+
this.time = time;
40+
this.unit = unit;
41+
this.executor = executor;
42+
}
43+
44+
@Override
45+
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
46+
TimerSubscription subscription = new TimerSubscription(subscriber);
47+
subscriber.onSubscribe(subscription);
48+
49+
subscription.setFuture(executor.schedule(subscription, time, unit));
50+
}
51+
52+
static final class TimerSubscription extends DeferredScalarSubscription<Long>
53+
implements Callable<Void> {
54+
55+
private final AtomicReference<Future<?>> future;
56+
57+
TimerSubscription(Flow.Subscriber<? super Long> downstream) {
58+
super(downstream);
59+
this.future = new AtomicReference<>();
60+
}
61+
62+
@Override
63+
public Void call() {
64+
future.lazySet(TerminatedFuture.FINISHED);
65+
complete(0L);
66+
return null;
67+
}
68+
69+
@Override
70+
public void cancel() {
71+
super.cancel();
72+
TerminatedFuture.cancel(future);
73+
}
74+
75+
public void setFuture(Future<?> f) {
76+
TerminatedFuture.setFuture(future, f);
77+
}
78+
}
79+
80+
}

common/reactive/src/main/java/io/helidon/common/reactive/Single.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.Flow;
2424
import java.util.concurrent.Flow.Publisher;
2525
import java.util.concurrent.Flow.Subscriber;
26+
import java.util.concurrent.ScheduledExecutorService;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
2829
import java.util.function.Consumer;
@@ -230,6 +231,21 @@ static <T> Single<T> never() {
230231
return SingleNever.<T>instance();
231232
}
232233

234+
235+
/**
236+
* Signal 0L and complete the sequence after the given time elapsed.
237+
* @param time the time to wait before signaling 0L and completion
238+
* @param unit the unit of time
239+
* @param executor the executor to run the waiting on
240+
* @return Single
241+
* @throws NullPointerException if {@code unit} or {@code executor} is {@code null}
242+
*/
243+
static Single<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor) {
244+
Objects.requireNonNull(unit, "unit is null");
245+
Objects.requireNonNull(executor, "executor is null");
246+
return new SingleTimer(time, unit, executor);
247+
}
248+
233249
/**
234250
* Relay upstream items until the other source signals an item or completes.
235251
* @param other the other sequence to signal the end of the main sequence
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.concurrent.Flow;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* Signal 0L and complete after the specified time.
26+
*/
27+
final class SingleTimer implements Single<Long> {
28+
29+
private final long time;
30+
31+
private final TimeUnit unit;
32+
33+
private final ScheduledExecutorService executor;
34+
35+
SingleTimer(long time, TimeUnit unit, ScheduledExecutorService executor) {
36+
this.time = time;
37+
this.unit = unit;
38+
this.executor = executor;
39+
}
40+
41+
@Override
42+
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
43+
MultiTimer.TimerSubscription subscription = new MultiTimer.TimerSubscription(subscriber);
44+
subscriber.onSubscribe(subscription);
45+
46+
subscription.setFuture(executor.schedule(subscription, time, unit));
47+
}
48+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import java.util.concurrent.CancellationException;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
/**
25+
* Enumeration that implements a finished or canceled future to be used
26+
* for reference comparison and atomic state transitions.
27+
*/
28+
enum TerminatedFuture implements Future<Object> {
29+
30+
/** The task has already finished normally. */
31+
FINISHED,
32+
33+
/** The task has been cancelled. */
34+
CANCELED;
35+
36+
@Override
37+
public boolean cancel(boolean mayInterruptIfRunning) {
38+
return false;
39+
}
40+
41+
@Override
42+
public boolean isCancelled() {
43+
return this == CANCELED;
44+
}
45+
46+
@Override
47+
public boolean isDone() {
48+
return true;
49+
}
50+
51+
@Override
52+
public Object get() {
53+
if (this == CANCELED) {
54+
throw new CancellationException();
55+
}
56+
return null;
57+
}
58+
59+
@Override
60+
public Object get(long timeout, TimeUnit unit) {
61+
if (this == CANCELED) {
62+
throw new CancellationException();
63+
}
64+
return null;
65+
}
66+
67+
/**
68+
* Atomically set a {@link Future} on the given field if that field doesn't
69+
* already hold the {@link #CANCELED} or {@link #FINISHED} instance.
70+
* @param field the target atomic reference field
71+
* @param f the future to set
72+
*/
73+
public static void setFuture(AtomicReference<Future<?>> field, Future<?> f) {
74+
for (;;) {
75+
Future<?> current = field.get();
76+
if (current == TerminatedFuture.CANCELED) {
77+
f.cancel(true);
78+
return;
79+
}
80+
if (current != null) {
81+
return;
82+
}
83+
if (field.compareAndSet(null, f)) {
84+
return;
85+
}
86+
}
87+
}
88+
89+
/**
90+
* Atomically cancel the future in the target field or mark it for
91+
* cancellation.
92+
* @param field the target atomic reference field
93+
*/
94+
public static void cancel(AtomicReference<Future<?>> field) {
95+
for (;;) {
96+
Future<?> current = field.get();
97+
if (current == TerminatedFuture.FINISHED || current == TerminatedFuture.CANCELED) {
98+
return;
99+
}
100+
if (field.compareAndSet(current, TerminatedFuture.CANCELED)) {
101+
if (current != null) {
102+
current.cancel(true);
103+
}
104+
return;
105+
}
106+
}
107+
}
108+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.common.reactive;
18+
19+
import org.reactivestreams.tck.TestEnvironment;
20+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
21+
import org.testng.annotations.AfterClass;
22+
import org.testng.annotations.BeforeClass;
23+
import org.testng.annotations.Test;
24+
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Flow;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
30+
@Test
31+
public class MultiTimerTckTest extends FlowPublisherVerification<Long> {
32+
33+
private static ScheduledExecutorService executor;
34+
35+
public MultiTimerTckTest() {
36+
super(new TestEnvironment(200));
37+
}
38+
39+
@BeforeClass
40+
public static void beforeClass() {
41+
executor = Executors.newSingleThreadScheduledExecutor();
42+
}
43+
44+
@AfterClass
45+
public static void afterClass() {
46+
executor.shutdown();
47+
}
48+
49+
@Override
50+
public Flow.Publisher<Long> createFlowPublisher(long l) {
51+
return Multi.timer(1, TimeUnit.MILLISECONDS, executor);
52+
}
53+
54+
@Override
55+
public Flow.Publisher<Long> createFailedFlowPublisher() {
56+
return null;
57+
}
58+
59+
@Override
60+
public long maxElementsFromPublisher() {
61+
return 1;
62+
}
63+
}

0 commit comments

Comments
 (0)