Skip to content

Commit e759531

Browse files
authored
[Reactive] Implement Multi.interval() + TCK tests (helidon-io#1526)
* [Reactive] Implement Multi.interval() + TCK tests * Try workaround for false-positive Spotbugs error * Emitted can be plain field
1 parent cfd06dd commit e759531

5 files changed

Lines changed: 390 additions & 0 deletions

File tree

common/reactive/src/main/java/io/helidon/common/reactive/Multi.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,43 @@ static Multi<Long> timer(long time, TimeUnit unit, ScheduledExecutorService exec
568568
return new MultiTimer(time, unit, executor);
569569
}
570570

571+
/**
572+
* Signal 0L, 1L and so on periodically to the downstream.
573+
* <p>
574+
* Note that if the downstream applies backpressure,
575+
* subsequent values may be delivered instantly upon
576+
* further requests from the downstream.
577+
* </p>
578+
* @param period the initial and in-between time
579+
* @param unit the time unit
580+
* @param executor the scheduled executor to use for the periodic emission
581+
* @return Multi
582+
* @throws NullPointerException if {@code unit} or {@code executor} is {@code null}
583+
*/
584+
static Multi<Long> interval(long period, TimeUnit unit, ScheduledExecutorService executor) {
585+
return interval(period, period, unit, executor);
586+
}
587+
588+
/**
589+
* Signal 0L after an initial delay, then 1L, 2L and so on periodically to the downstream.
590+
* <p>
591+
* Note that if the downstream applies backpressure,
592+
* subsequent values may be delivered instantly upon
593+
* further requests from the downstream.
594+
* </p>
595+
* @param initialDelay the time before signaling 0L
596+
* @param period the in-between wait time for values 1L, 2L and so on
597+
* @param unit the time unit
598+
* @param executor the scheduled executor to use for the periodic emission
599+
* @return Multi
600+
* @throws NullPointerException if {@code unit} or {@code executor} is {@code null}
601+
*/
602+
static Multi<Long> interval(long initialDelay, long period, TimeUnit unit, ScheduledExecutorService executor) {
603+
Objects.requireNonNull(unit, "unit is null");
604+
Objects.requireNonNull(executor, "executor is null");
605+
return new MultiInterval(initialDelay, period, unit, executor);
606+
}
607+
571608
/**
572609
* {@link java.util.function.Function} providing one item to be submitted as onNext in case of onError signal is received.
573610
*
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.helidon.common.reactive;
17+
18+
import java.util.concurrent.Flow;
19+
import java.util.concurrent.Future;
20+
import java.util.concurrent.ScheduledExecutorService;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
/**
27+
* Signal an ever increasing long value periodically.
28+
*/
29+
final class MultiInterval implements Multi<Long> {
30+
31+
private final long initialDelay;
32+
33+
private final long period;
34+
35+
private final TimeUnit unit;
36+
37+
private final ScheduledExecutorService executor;
38+
39+
MultiInterval(long initialDelay, long period, TimeUnit unit, ScheduledExecutorService executor) {
40+
this.initialDelay = initialDelay;
41+
this.period = period;
42+
this.unit = unit;
43+
this.executor = executor;
44+
}
45+
46+
@Override
47+
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
48+
IntervalSubscription subscription = new IntervalSubscription(subscriber);
49+
subscriber.onSubscribe(subscription);
50+
51+
subscription.setFuture(executor.scheduleAtFixedRate(subscription, initialDelay, period, unit));
52+
}
53+
54+
static final class IntervalSubscription extends AtomicInteger implements Flow.Subscription, Runnable {
55+
56+
private final Flow.Subscriber<? super Long> downstream;
57+
58+
private final AtomicLong requested;
59+
60+
private final AtomicReference<Future<?>> future;
61+
62+
private volatile long available;
63+
64+
private volatile int canceled;
65+
66+
private long emitted;
67+
68+
private static final int NORMAL_CANCEL = 1;
69+
private static final int BAD_REQUEST = 2;
70+
71+
IntervalSubscription(Flow.Subscriber<? super Long> downstream) {
72+
this.downstream = downstream;
73+
this.requested = new AtomicLong();
74+
this.future = new AtomicReference<>();
75+
}
76+
77+
@Override
78+
public void run() {
79+
long next = available + 1;
80+
available = next;
81+
drain();
82+
}
83+
84+
@Override
85+
public void request(long n) {
86+
if (n <= 0L) {
87+
canceled = BAD_REQUEST;
88+
n = 1L;
89+
}
90+
SubscriptionHelper.addRequest(requested, n);
91+
drain();
92+
}
93+
94+
@Override
95+
public void cancel() {
96+
canceled = NORMAL_CANCEL;
97+
TerminatedFuture.cancel(future);
98+
}
99+
100+
void setFuture(Future<?> f) {
101+
TerminatedFuture.setFuture(future, f);
102+
}
103+
104+
void drain() {
105+
if (getAndIncrement() != 0) {
106+
return;
107+
}
108+
109+
do {
110+
int c = canceled;
111+
if (c != 0) {
112+
if (c == BAD_REQUEST) {
113+
downstream.onError(new IllegalArgumentException(
114+
"Rule §3.9 violated: non-positive requests are forbidden"));
115+
}
116+
return;
117+
}
118+
119+
long avail = available;
120+
long req = requested.get();
121+
long emit = emitted;
122+
123+
if (emit != req && emit != avail) {
124+
downstream.onNext(emit);
125+
emitted = emit + 1;
126+
}
127+
128+
} while (decrementAndGet() != 0);
129+
}
130+
}
131+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import org.reactivestreams.tck.TestEnvironment;
20+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
21+
import org.testng.annotations.AfterClass;
22+
import org.testng.annotations.BeforeClass;
23+
import org.testng.annotations.Test;
24+
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Flow;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
30+
@Test
31+
public class MultiIntervalTckTest extends FlowPublisherVerification<Long> {
32+
33+
private static ScheduledExecutorService executor;
34+
35+
public MultiIntervalTckTest() {
36+
super(new TestEnvironment(150));
37+
}
38+
39+
@Override
40+
public Flow.Publisher<Long> createFlowPublisher(long l) {
41+
return Multi.interval(1, 1, TimeUnit.MILLISECONDS, executor).limit(l);
42+
}
43+
44+
@Override
45+
public Flow.Publisher<Long> createFailedFlowPublisher() {
46+
return null;
47+
}
48+
49+
@Override
50+
public long maxElementsFromPublisher() {
51+
return 10;
52+
}
53+
54+
@BeforeClass
55+
public static void beforeClass() {
56+
executor = Executors.newSingleThreadScheduledExecutor();
57+
}
58+
59+
@AfterClass
60+
public static void afterClass() {
61+
executor.shutdown();
62+
}
63+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import org.testng.annotations.AfterClass;
20+
import org.testng.annotations.BeforeClass;
21+
import org.testng.annotations.Test;
22+
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.TimeUnit;
26+
27+
public class MultiIntervalTest {
28+
29+
private static ScheduledExecutorService executor;
30+
31+
@BeforeClass
32+
public static void beforeClass() {
33+
executor = Executors.newSingleThreadScheduledExecutor();
34+
}
35+
36+
@AfterClass
37+
public static void afterClass() {
38+
executor.shutdown();
39+
}
40+
41+
@Test
42+
public void normal() {
43+
TestSubscriber<Long> ts = new TestSubscriber<>(Long.MAX_VALUE);
44+
45+
Multi.interval(1, TimeUnit.MILLISECONDS, executor)
46+
.subscribe(ts);
47+
48+
ts.awaitCount(10)
49+
.cancel()
50+
.assertNotTerminated();
51+
}
52+
53+
@Test
54+
public void normalBackpressured() throws Exception {
55+
TestSubscriber<Long> ts = new TestSubscriber<>();
56+
57+
Multi.interval(10, TimeUnit.MILLISECONDS, executor)
58+
.subscribe(ts);
59+
60+
Thread.sleep(100);
61+
62+
ts.assertEmpty()
63+
.request(15);
64+
65+
ts.awaitCount(15)
66+
.cancel()
67+
.assertNotTerminated();
68+
}
69+
}

0 commit comments

Comments
 (0)