Skip to content

Commit 57e5771

Browse files
authored
MP Reactive Messaging impl (helidon-io#1287)
* MP Reactive Messaging impl * Arquillian manually managed deploy fix after native compatibility cdi changes Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
1 parent 36ab8d4 commit 57e5771

168 files changed

Lines changed: 11997 additions & 29 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bom/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,11 +626,18 @@
626626

627627
<!-- Reactive Streams Operators -->
628628
<dependency>
629-
<groupId>io.helidon.microprofile</groupId>
629+
<groupId>io.helidon.microprofile.reactive-streams</groupId>
630630
<artifactId>helidon-microprofile-reactive-streams</artifactId>
631631
<version>${helidon.version}</version>
632632
</dependency>
633633

634+
<!-- Reactive Messaging -->
635+
<dependency>
636+
<groupId>io.helidon.microprofile.messaging</groupId>
637+
<artifactId>helidon-microprofile-messaging</artifactId>
638+
<version>${helidon.version}</version>
639+
</dependency>
640+
634641
<!-- integrations -->
635642
<dependency>
636643
<groupId>io.helidon.serviceconfiguration</groupId>

common/common/src/main/java/io/helidon/common/Errors.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import java.util.Objects;
2222
import java.util.Set;
2323
import java.util.logging.Logger;
24+
import java.util.stream.Collectors;
2425

2526
/**
2627
* Errors utility used to file processing messages (e.g. validation, provider, resource building errors, hint).
@@ -156,6 +157,13 @@ public boolean log(Logger logger) {
156157
return true;
157158
}
158159

160+
@Override
161+
public String toString() {
162+
return this.stream()
163+
.map(ErrorMessage::toString)
164+
.collect(Collectors.joining("\n"));
165+
}
166+
159167
/**
160168
* Check if these messages are a valid result.
161169
*

dependencies/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@
8181
<version.lib.microprofile-fault-tolerance-api>2.0.2</version.lib.microprofile-fault-tolerance-api>
8282
<version.lib.microprofile-tracing>1.3.1</version.lib.microprofile-tracing>
8383
<version.lib.microprofile-rest-client>1.3.3</version.lib.microprofile-rest-client>
84+
<version.lib.microprofile-reactive-messaging-api>1.0</version.lib.microprofile-reactive-messaging-api>
85+
<version.lib.microprofile-reactive-streams-operators-api>1.0.1</version.lib.microprofile-reactive-streams-operators-api>
86+
<version.lib.microprofile-reactive-streams-operators-core>1.0.1</version.lib.microprofile-reactive-streams-operators-core>
8487
<version.lib.mockito>2.23.4</version.lib.mockito>
8588
<version.lib.mongodb.reactivestreams>1.11.0</version.lib.mongodb.reactivestreams>
8689
<version.lib.mysql-connector-java>8.0.11</version.lib.mysql-connector-java>
@@ -485,6 +488,21 @@
485488
</exclusion>
486489
</exclusions>
487490
</dependency>
491+
<dependency>
492+
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
493+
<artifactId>microprofile-reactive-streams-operators-api</artifactId>
494+
<version>${version.lib.microprofile-reactive-streams-operators-api}</version>
495+
</dependency>
496+
<dependency>
497+
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
498+
<artifactId>microprofile-reactive-streams-operators-core</artifactId>
499+
<version>${version.lib.microprofile-reactive-streams-operators-core}</version>
500+
</dependency>
501+
<dependency>
502+
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
503+
<artifactId>microprofile-reactive-messaging-api</artifactId>
504+
<version>${version.lib.microprofile-reactive-messaging-api}</version>
505+
</dependency>
488506
<dependency>
489507
<groupId>com.netflix.hystrix</groupId>
490508
<artifactId>hystrix-core</artifactId>

microprofile/messaging/pom.xml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright (c) 2020 Oracle and/or its affiliates.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
18+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xmlns="http://maven.apache.org/POM/4.0.0"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>io.helidon.microprofile</groupId>
24+
<artifactId>helidon-microprofile-project</artifactId>
25+
<version>2.0.0-SNAPSHOT</version>
26+
</parent>
27+
28+
<groupId>io.helidon.microprofile.messaging</groupId>
29+
<artifactId>helidon-microprofile-messaging</artifactId>
30+
<name>Helidon MicroProfile Reactive Messaging</name>
31+
<description>
32+
Helidon MicroProfile Reactive Messaging
33+
</description>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
38+
<artifactId>microprofile-reactive-messaging-api</artifactId>
39+
<exclusions>
40+
<exclusion>
41+
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
42+
<artifactId>microprofile-reactive-streams-operators-api</artifactId>
43+
</exclusion>
44+
<exclusion>
45+
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
46+
<artifactId>microprofile-reactive-streams-operators-core</artifactId>
47+
</exclusion>
48+
</exclusions>
49+
</dependency>
50+
<dependency>
51+
<groupId>io.helidon.microprofile.config</groupId>
52+
<artifactId>helidon-microprofile-config</artifactId>
53+
</dependency>
54+
<dependency>
55+
<groupId>io.helidon.microprofile.server</groupId>
56+
<artifactId>helidon-microprofile-server</artifactId>
57+
</dependency>
58+
<dependency>
59+
<groupId>io.helidon.microprofile.reactive-streams</groupId>
60+
<artifactId>helidon-microprofile-reactive-streams</artifactId>
61+
</dependency>
62+
<dependency>
63+
<groupId>javax.interceptor</groupId>
64+
<artifactId>javax.interceptor-api</artifactId>
65+
<scope>provided</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>io.helidon.microprofile.bundles</groupId>
69+
<artifactId>internal-test-libs</artifactId>
70+
<scope>test</scope>
71+
</dependency>
72+
</dependencies>
73+
</project>
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.microprofile.messaging;
19+
20+
import java.lang.reflect.Method;
21+
import java.util.Optional;
22+
23+
import javax.enterprise.inject.spi.Bean;
24+
import javax.enterprise.inject.spi.BeanManager;
25+
26+
import io.helidon.common.Errors;
27+
import io.helidon.config.Config;
28+
29+
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
30+
31+
abstract class AbstractMessagingMethod {
32+
33+
private String incomingChannelName;
34+
private String outgoingChannelName;
35+
36+
private Bean<?> bean;
37+
private Object beanInstance;
38+
private MethodSignatureType type;
39+
private final Method method;
40+
private final Errors.Collector errors;
41+
private Acknowledgment.Strategy ackStrategy;
42+
43+
44+
AbstractMessagingMethod(Method method, Errors.Collector errors) {
45+
this.method = method;
46+
this.errors = errors;
47+
Optional<MethodSignatureType> signatureType = MethodSignatureResolver
48+
.create(method)
49+
.resolve();
50+
if (signatureType.isPresent()) {
51+
this.type = signatureType.get();
52+
resolveAckStrategy();
53+
} else {
54+
errors.fatal("Unsupported method signature " + method);
55+
}
56+
}
57+
58+
void validate() {
59+
Optional.ofNullable(method.getAnnotation(Acknowledgment.class))
60+
.map(Acknowledgment::value)
61+
.filter(s -> !type.getSupportedAckStrategies().contains(s))
62+
.ifPresent(strategy -> {
63+
errors.fatal(String.format("Acknowledgment strategy %s is not supported for method signature: %s",
64+
strategy, type));
65+
});
66+
}
67+
68+
void init(BeanManager beanManager, Config config) {
69+
this.beanInstance = ChannelRouter.lookup(bean, beanManager);
70+
}
71+
72+
Method getMethod() {
73+
return method;
74+
}
75+
76+
Errors.Collector errors() {
77+
return errors;
78+
}
79+
80+
Object getBeanInstance() {
81+
return beanInstance;
82+
}
83+
84+
void setDeclaringBean(Bean<?> bean) {
85+
this.bean = bean;
86+
}
87+
88+
Class<?> getDeclaringType() {
89+
return method.getDeclaringClass();
90+
}
91+
92+
String getIncomingChannelName() {
93+
return incomingChannelName;
94+
}
95+
96+
String getOutgoingChannelName() {
97+
return outgoingChannelName;
98+
}
99+
100+
void setIncomingChannelName(String incomingChannelName) {
101+
this.incomingChannelName = incomingChannelName;
102+
}
103+
104+
void setOutgoingChannelName(String outgoingChannelName) {
105+
this.outgoingChannelName = outgoingChannelName;
106+
}
107+
108+
MethodSignatureType getType() {
109+
return type;
110+
}
111+
112+
void setType(MethodSignatureType type) {
113+
this.type = type;
114+
}
115+
116+
Acknowledgment.Strategy getAckStrategy() {
117+
return ackStrategy;
118+
}
119+
120+
private void resolveAckStrategy() {
121+
ackStrategy =
122+
Optional.ofNullable(method.getAnnotation(Acknowledgment.class))
123+
.map(Acknowledgment::value)
124+
.orElse(type.getDefaultAckType());
125+
}
126+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.microprofile.messaging;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import io.helidon.config.Config;
24+
import io.helidon.config.ConfigSources;
25+
26+
/**
27+
* Detached configuration of a single connector.
28+
*/
29+
class AdHocConfigBuilder {
30+
private final Map<String, String> configuration = new HashMap<>();
31+
32+
private AdHocConfigBuilder() {
33+
}
34+
35+
static AdHocConfigBuilder from(Config config) {
36+
AdHocConfigBuilder result = new AdHocConfigBuilder();
37+
result.putAll(config);
38+
return result;
39+
}
40+
41+
AdHocConfigBuilder put(String key, String value) {
42+
configuration.put(key, value);
43+
return this;
44+
}
45+
46+
AdHocConfigBuilder putAll(Config configToPut) {
47+
configuration.putAll(configToPut.detach().asMap().orElse(Map.of()));
48+
return this;
49+
}
50+
51+
org.eclipse.microprofile.config.Config build() {
52+
Config newConfig = Config.builder(ConfigSources.create(configuration))
53+
.disableEnvironmentVariablesSource()
54+
.disableSystemPropertiesSource()
55+
.disableFilterServices()
56+
.disableSourceServices()
57+
.disableParserServices()
58+
.build();
59+
return (org.eclipse.microprofile.config.Config) newConfig;
60+
}
61+
}

0 commit comments

Comments
 (0)