Skip to content

Commit 3009675

Browse files
authored
Messaging 3.0 (helidon-io#4091)
* Messaging 3.0 Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
1 parent 525736c commit 3009675

75 files changed

Lines changed: 2673 additions & 1374 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
2+
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package io.helidon.common.reactive;
1818

19+
import java.util.Queue;
1920
import java.util.concurrent.ConcurrentLinkedQueue;
2021
import java.util.concurrent.Flow;
2122
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,7 +35,7 @@
3435
*/
3536
public class BufferedEmittingPublisher<T> implements Flow.Publisher<T> {
3637

37-
private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();
38+
private final Queue<T> buffer;
3839
private volatile Throwable error;
3940
private BiConsumer<Long, Long> requestCallback = null;
4041
private Consumer<? super T> onEmitCallback = null;
@@ -60,17 +61,12 @@ public class BufferedEmittingPublisher<T> implements Flow.Publisher<T> {
6061
// against a completion (isCancelled() and isComplete() are both true)
6162
private boolean cancelled;
6263

63-
protected BufferedEmittingPublisher() {
64+
protected BufferedEmittingPublisher(Queue<T> queue) {
65+
buffer = queue;
6466
}
6567

66-
/**
67-
* Create new {@link BufferedEmittingPublisher}.
68-
*
69-
* @param <T> type of emitted item
70-
* @return new instance of BufferedEmittingPublisher
71-
*/
72-
public static <T> BufferedEmittingPublisher<T> create() {
73-
return new BufferedEmittingPublisher<T>();
68+
protected BufferedEmittingPublisher() {
69+
buffer = new ConcurrentLinkedQueue<>();
7470
}
7571

7672
@Override
@@ -488,4 +484,60 @@ private void drain() {
488484
throw ise;
489485
}
490486
}
487+
488+
/**
489+
* Create new {@link BufferedEmittingPublisher}.
490+
*
491+
* @param <T> type of emitted item
492+
* @return new instance of BufferedEmittingPublisher
493+
*/
494+
public static <T> BufferedEmittingPublisher<T> create() {
495+
BufferedEmittingPublisher<T> bep = BufferedEmittingPublisher.<T>builder().build();
496+
return bep;
497+
}
498+
499+
/**
500+
* Create new builder for BufferedEmittingPublisher.
501+
*
502+
* @param <T> type of the expected payload
503+
* @return new builder
504+
*/
505+
public static <T> BufferedEmittingPublisher.Builder<T> builder() {
506+
return new BufferedEmittingPublisher.Builder<>();
507+
}
508+
509+
/**
510+
* Fluent API builder to create {@link io.helidon.common.reactive.BufferedEmittingPublisher}.
511+
*
512+
* @param <T> type of the expected payload
513+
*/
514+
public static class Builder<T> implements io.helidon.common.Builder<BufferedEmittingPublisher.Builder<T>, BufferedEmittingPublisher<T>> {
515+
516+
private Queue<T> queue;
517+
518+
private Builder() {
519+
}
520+
521+
/**
522+
* Set up custom buffer queue implementation for the emitter to use.
523+
*
524+
* @param queue to be used as a buffer
525+
* @return this builder
526+
*/
527+
public BufferedEmittingPublisher.Builder<T> buffer(Queue<T> queue) {
528+
this.queue = queue;
529+
return this;
530+
}
531+
532+
@Override
533+
public BufferedEmittingPublisher<T> build() {
534+
BufferedEmittingPublisher<T> bep;
535+
if (queue != null) {
536+
bep = new BufferedEmittingPublisher<>(queue);
537+
} else {
538+
bep = new BufferedEmittingPublisher<>();
539+
}
540+
return bep;
541+
}
542+
}
491543
}

dependencies/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@
114114
<version.lib.microprofile-jwt>2.0</version.lib.microprofile-jwt>
115115
<version.lib.microprofile-metrics-api>4.0</version.lib.microprofile-metrics-api>
116116
<version.lib.microprofile-openapi-api>3.0</version.lib.microprofile-openapi-api>
117-
<version.lib.microprofile-reactive-messaging-api>2.0.1</version.lib.microprofile-reactive-messaging-api>
118-
<version.lib.microprofile-reactive-streams-operators-api>2.0</version.lib.microprofile-reactive-streams-operators-api>
119-
<version.lib.microprofile-reactive-streams-operators-core>2.0</version.lib.microprofile-reactive-streams-operators-core>
117+
<version.lib.microprofile-reactive-messaging-api>3.0-RC2</version.lib.microprofile-reactive-messaging-api>
118+
<version.lib.microprofile-reactive-streams-operators-api>3.0-RC1</version.lib.microprofile-reactive-streams-operators-api>
119+
<version.lib.microprofile-reactive-streams-operators-core>3.0-RC1</version.lib.microprofile-reactive-streams-operators-core>
120120
<version.lib.microprofile-rest-client>3.0</version.lib.microprofile-rest-client>
121121
<version.lib.microprofile-tracing>3.0</version.lib.microprofile-tracing>
122122
<version.lib.microprofile-lra-api>2.0-RC1</version.lib.microprofile-lra-api>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2022 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.microprofile.messaging;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionStage;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
25+
import org.eclipse.microprofile.reactive.messaging.Message;
26+
27+
class AckCtx {
28+
private final AbstractMessagingMethod m;
29+
private final Message<?> msg;
30+
private final AtomicBoolean acked = new AtomicBoolean();
31+
32+
private AckCtx(AbstractMessagingMethod m, Message<?> msg) {
33+
this.m = m;
34+
this.msg = msg;
35+
}
36+
37+
static AckCtx create(AbstractMessagingMethod m, Message<?> msg) {
38+
return new AckCtx(m, msg);
39+
}
40+
41+
CompletionStage<Void> preAck() {
42+
if (m.getAckStrategy().equals(Acknowledgment.Strategy.PRE_PROCESSING) && !acked.getAndSet(true)) {
43+
return msg.ack();
44+
}
45+
return CompletableFuture.completedStage(null);
46+
}
47+
48+
CompletionStage<Void> postNack(Throwable t) {
49+
if (m.getAckStrategy().equals(Acknowledgment.Strategy.POST_PROCESSING) && !acked.getAndSet(true)) {
50+
return msg.nack(t);
51+
}
52+
return CompletableFuture.completedStage(null);
53+
}
54+
55+
CompletionStage<Void> postAck() {
56+
if (m.getAckStrategy().equals(Acknowledgment.Strategy.POST_PROCESSING) && !acked.getAndSet(true)) {
57+
return msg.ack();
58+
}
59+
return CompletableFuture.completedStage(null);
60+
}
61+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2022 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.microprofile.messaging;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionStage;
22+
23+
import io.helidon.common.reactive.BufferedEmittingPublisher;
24+
25+
import org.eclipse.microprofile.reactive.messaging.Message;
26+
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
27+
import org.reactivestreams.FlowAdapters;
28+
import org.reactivestreams.Publisher;
29+
30+
/**
31+
* Emitter used for {@link org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy#BUFFER}.
32+
*/
33+
class BufferedEmitter extends OutgoingEmitter {
34+
35+
private final BufferedEmittingPublisher<Object> bep = BufferedEmittingPublisher.create();
36+
37+
BufferedEmitter(String channelName, String fieldName, OnOverflow onOverflow) {
38+
super(channelName, fieldName, onOverflow);
39+
}
40+
41+
@Override
42+
@SuppressWarnings("rawtypes")
43+
public CompletionStage<Void> send(Object p) {
44+
try {
45+
lock().lock();
46+
validate(p);
47+
CompletableFuture<Void> acked = new CompletableFuture<>();
48+
this.send(MessageUtils.create(p, acked));
49+
return acked;
50+
} finally {
51+
lock().unlock();
52+
}
53+
}
54+
55+
@Override
56+
public <M extends Message<? extends Object>> void send(M m) {
57+
try {
58+
lock().lock();
59+
validate(m);
60+
bep.emit(m);
61+
} finally {
62+
lock().unlock();
63+
}
64+
}
65+
66+
@Override
67+
public void complete() {
68+
try {
69+
lock().lock();
70+
super.complete();
71+
bep.complete();
72+
} finally {
73+
lock().unlock();
74+
}
75+
}
76+
77+
@Override
78+
public void error(Exception e) {
79+
try {
80+
lock().lock();
81+
super.error(e);
82+
bep.fail(e);
83+
} finally {
84+
lock().unlock();
85+
}
86+
}
87+
88+
@Override
89+
public boolean isCancelled() {
90+
try {
91+
lock().lock();
92+
return bep.isCancelled() || bep.isCompleted();
93+
} finally {
94+
lock().unlock();
95+
}
96+
}
97+
98+
@Override
99+
public boolean hasRequests() {
100+
return bep.hasRequests();
101+
}
102+
103+
@Override
104+
public Publisher<?> getPublisher() {
105+
return FlowAdapters.toPublisher(bep);
106+
}
107+
108+
@Override
109+
public void validate(Object payload) {
110+
super.validate(payload);
111+
if (getOverflowStrategy().equals(OnOverflow.Strategy.BUFFER)) {
112+
int bufferSize = bep.bufferSize();
113+
if (getBufferLimit() > 0 && bufferSize > getBufferLimit()) {
114+
RuntimeException ex = new IllegalStateException("Emitter buffer overflow");
115+
throw ex;
116+
}
117+
}
118+
}
119+
}

microprofile/messaging/core/src/main/java/io/helidon/microprofile/messaging/ChannelHealthProbe.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)