Asynchronous messaging is a commonly used form of communication in the world of microservices. While its possible to start building your reactive streams directly by combining operators and connecting them to reactive APIs, with Helidon SE Reactive Messaging, you can now use prepared tools for repetitive use case scenarios .
<dependency>
<groupId>io.helidon.messaging</groupId>
<artifactId>helidon-messaging</artifactId>
</dependency>For example connecting your streams to external services usually requires a lot of boiler-plate code for configuration handling, backpressure propagation, acknowledgement and more.
For such tasks there is a system of connectors, emitters and means to orchestrate them in Helidon, called Reactive Messaging. It’s basically an API for connecting and configuring Connectors and Emitters with your reactive streams through so called Channels.
You may wonder how Reactive Messaging relates to MicroProfile Reactive Messaging. As the making of connectors or even configuring them can be repetitive task leading to the same results, Helidon SE Reactive Messaging supports very same configuration format for connectors as its MicroProfile counterpart does. Also, MP Connectors are reusable in Helidon SE Messaging with some limitation(there is no CDI in Helidon SE). All Messaging connectors in Helidon are made to be universally usable by Helidon MP and SE.
Channel is a named pair of Publisher and Subscriber. Channels can be connected together by
processors. Registering of Publisher or Subscriber for a channel can be done
by Messaging API, or configured implicitly for using registered connectors
for generating such Publisher or Subscriber.
Channel<String> channel1 = Channel.create("channel1");
Messaging.builder()
.publisher(channel1, Multi.just("message 1", "message 2")
.map(Message::of))
.listener(channel1, s -> System.out.println("Intecepted message " + s))
.build()
.start();Processor is a typical reactive processor acting as a Subscriber to upstream and as a Publisher
to downstream. In terms of reactive messaging it is able to connect two channels to one
reactive stream.
Channel<String> firstChannel = Channel.create("first-channel");
Channel<String> secondChannel = Channel.create("second-channel");
Messaging.builder()
.publisher(secondChannel, ReactiveStreams.of("test1", "test2", "test3")
.map(Message::of))
.processor(secondChannel, firstChannel, ReactiveStreams.<Message<String>>builder()
.map(Message::getPayload)
.map(String::toUpperCase)
.map(Message::of)
)
.subscriber(firstChannel, ReactiveStreams.<Message<String>>builder()
.peek(Message::ack)
.map(Message::getPayload)
.forEach(s -> System.out.println("Consuming message " + s)))
.build()
.start();
>Consuming message TEST1
>Consuming message TEST2
>Consuming message TEST3Reactive Messaging in Helidon SE uses the same concept of message wrapping as MicroProfile messaging. The only notable difference is that SE Messaging does almost no implicit or automatic acknowledgement due to no magic philosophy of Helidon SE.
Only exception to this are variants of methods Messaging.Builder#listener and
Messaging.Builder#processor with consumer or function params, conveniently unwrapping payload
for you. After such implicit unwrapping is not possible to do a manual acknowledgement, therefore
implicit ack before callback is executed is necessary.
Connector concept is a way for connecting channels to external sources. To make creation and usage of connectors as easy and versatile as possible, Helidon SE Messaging uses same API for connectors like MicroProfile Reactive Messaging does. This allows connectors to be usable in both flavors of Helidon with one limitation which is that connector has to be able to work without CDI.
Example of such a versatile connectors in Helidon:
Connector for Reactive Messaging is a factory producing Publishers and Subscribers for
Channels in Reactive Messaging. Messaging connector is just an implementation of
IncomingConnectorFactory, OutgoingConnectorFactory or both.
example-connector:@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
return ReactiveStreams.of("foo", "bar")
.map(Message::of);
}
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.map(Message::getPayload)
.forEach(o -> System.out.println("Connector says: " + o));
}
}mp.messaging.outgoing.to-connector-channel.connector: example-connector
mp.messaging.incoming.from-connector-channel.connector: example-connectorConfig config = Config.create();
Messaging.builder()
.config(config)
.connector(new ExampleConnector())
.publisher(Channel.create("to-connector-channel"),
ReactiveStreams.of("fee", "fie")
.map(Message::of)
)
.build()
.start();
> Connector says: fee
> Connector says: fieMessaging.builder()
.connector(new ExampleConnector())
.subscriber(Channel.create("from-connector-channel"),
ReactiveStreams.<Message<String>>builder()
.peek(Message::ack)
.map(Message::getPayload)
.forEach(s -> System.out.println("Consuming: " + s))
)
.build()
.start();
> Consuming: foo
> Consuming: barMessaging connector in Helidon SE can be configured explicitly by API or implicitly by config following notation of MicroProfile Reactive Messaging.
Configuration is being supplied to connector by Messaging implementation, two mandatory attributes are always present:
-
channel-namename of the channel which has this connector configured as Publisher or Subscriber,Channel.create('name-of-channel')in case of explicit configuration ormp.messaging.incoming.name-of-channel.connector: connector-namein case of implicit config -
connectorname of the connector@Connector("connector-name")
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(final Config config) {
String firstPropValue = config.getValue("first-test-prop", String.class);(1)
String secondPropValue = config.getValue("second-test-prop", String.class);
return ReactiveStreams.of(firstPropValue, secondPropValue)
.map(Message::of);
}
}-
Config context is merged from channel and connector contexts
An explicit config for channel’s publisher is possible with Channel.Builder#publisherConfig(Config config)
and for subscriber with Channel.Builder#subscriberConfig(Config config).
Supplied Helidon Config is merged with
mandatory attributes and any implicit config found. Resulting config is served to Connector.
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> fromKafka = Channel.<String>builder()(1)(2)
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build()
)
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();(3)
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();-
Prepare channel for connecting kafka connector with specific publisher configuration → listener,
-
Channel → connector mapping is automatic when using
KafkaConnector.configBuilder() -
Prepare Kafka connector, can be used by any channel
Implicit config without any hard-coding is possible with Helidon Config following notation of MicroProfile Reactive Messaging.
mp.messaging.incoming.from-connector-channel.connector: example-connector(1)
mp.messaging.incoming.from-connector-channel.first-test-prop: foo(2)
mp.messaging.connector.example-connector.second-test-prop: bar(3)-
Channel → Connector mapping
-
Channel configuration properties
-
Connector configuration properties
Config config = Config.create();
Messaging.builder()
.config(config)
.connector(new ExampleConnector())
.listener(Channel.create("from-connector-channel"),
s -> System.out.println("Consuming: " + s))
.build()
.start();
> Consuming: foo
> Consuming: barAs the API is the same for MicroProfile Reactive Messaging
connectors, all that is needed to make connector work in both ways is annotating it with
@ApplicationScoped. Such connector is treated as a bean in Helidon MP.
For specific information about creating messaging connectors for Helidon MP visit MicroProfile Reactive Messaging.
<dependency>
<groupId>io.helidon.messaging.kafka</groupId>
<artifactId>helidon-messaging-kafka</artifactId>
</dependency>String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> fromKafka = Channel.<String>builder()(1)(2)
.name("from-kafka")
.publisherConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.groupId("example-group-" + session.getId())
.topic(topic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.LATEST)
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build()
)
.build();
KafkaConnector kafkaConnector = KafkaConnector.create();(3)
Messaging messaging = Messaging.builder()
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();-
Prepare a channel for connecting kafka connector with specific publisher configuration → listener
-
Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
-
Prepare Kafka connector, can be used by any channel
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
Channel<String> toKafka = Channel.<String>builder()(1)(2)
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build()
).build();
KafkaConnector kafkaConnector = KafkaConnector.create();(3)
messaging = Messaging.builder()
.publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();-
Prepare a channel for connecting kafka connector with specific publisher configuration → listener
-
Channel → connector mapping is automatic when using KafkaConnector.configBuilder()
-
Prepare Kafka connector, can be used by any channel
mp.messaging:
incoming.from-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
auto.offset.reset: latest # (1)
enable.auto.commit: true
group.id: example-group-id
outgoing.to-kafka:
connector: helidon-kafka
topic: messaging-test-topic-1
connector:
helidon-kafka:
bootstrap.servers: localhost:9092 # (2)
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer-
Kafka client consumer’s property auto.offset.reset configuration for
from-kafkachannel only -
Kafka client’s property bootstrap.servers configuration for all channels using the connector
Config config = Config.create();
Channel<String> fromKafka = Channel.create("from-kafka");
KafkaConnector kafkaConnector = KafkaConnector.create();(1)
Messaging messaging = Messaging.builder()
.config(config)
.connector(kafkaConnector)
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
})
.build()
.start();-
Prepare Kafka connector, can be used by any channel
Config config = Config.create();
Channel<String> toKafka = Channel.create("to-kafka");
KafkaConnector kafkaConnector = KafkaConnector.create();(1)
messaging = Messaging.builder()
.config(config)
.publisher(toKafka, Multi.just("test1", "test2").map(Message::of))
.connector(kafkaConnector)
.build()
.start();-
Prepare Kafka connector, can be used by any channel
Don’t forget to check out the examples with pre-configured Kafka docker image, for easy testing:
<dependency>
<groupId>io.helidon.messaging.jms</groupId>
<artifactId>helidon-messaging-jms</artifactId>
</dependency>Channel<String> fromJms = Channel.<String>builder()(1)(2)
.name("from-jms")
.publisherConfig(JmsConnector.configBuilder()
.jndiInitialFactory(ActiveMQInitialContextFactory.class)
.jndiProviderUrl("tcp://127.0.0.1:61616")
.type(JmsConfigBuilder.Type.QUEUE)
.destination("se-example-queue-1")
.build()
)
.build();
JmsConnector jmsConnector = JmsConnector.create();(3)
Messaging messaging = Messaging.builder()
.connector(jmsConnector)
.listener(fromJms, payload -> {
System.out.println("Jms says: " + payload);
})
.build()
.start();-
Prepare a channel for connecting jms connector with specific publisher configuration → listener
-
Channel → connector mapping is automatic when using JmsConnector.configBuilder()
-
Prepare JMS connector, can be used by any channel
Channel<String> toJms = Channel.<String>builder()(1)(2)
.subscriberConfig(JmsConnector.configBuilder()
.jndiInitialFactory(ActiveMQInitialContextFactory.class)
.jndiProviderUrl("tcp://127.0.0.1:61616")
.type(JmsConfigBuilder.Type.QUEUE)
.destination("se-example-queue-1")
.build()
).build();
JmsConnector jmsConnector = JmsConnector.create();(3)
messaging = Messaging.builder()
.publisher(toJms, Multi.just("test1", "test2").map(Message::of))
.connector(jmsConnector)
.build()
.start();-
Prepare a channel for connecting jms connector with specific publisher configuration → listener
-
Channel → connector mapping is automatic when using JmsConnector.configBuilder()
-
Prepare JMS connector, can be used by any channel
mp.messaging:
incoming.from-jms:
connector: helidon-jms
destination: se-example-queue-1
session-group-id: session-group-1
type: queue
outgoing.to-jms:
connector: helidon-jms
destination: se-example-queue-1
type: queue
connector:
helidon-jms:
jndi:
jms-factory: ConnectionFactory
env-properties:
java.naming.factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url: tcp://127.0.0.1:61616Config config = Config.create();
Channel<String> fromJms = Channel.create("from-jms");
JmsConnector jmsConnector = JmsConnector.create();(1)
Messaging messaging = Messaging.builder()
.config(config)
.connector(jmsConnector)
.listener(fromJms, payload -> {
System.out.println("Jms says: " + payload);
})
.build()
.start();-
Prepare JMS connector, can be used by any channel
Config config = Config.create();
Channel<String> toJms = Channel.create("to-jms");
JmsConnector jmsConnector = JmsConnector.create();(1)
messaging = Messaging.builder()
.config(config)
.publisher(toJms, Multi.just("test1", "test2").map(Message::of))
.connector(jmsConnector)
.build()
.start();-
Prepare JMS connector, can be used by any channel
Don’t forget to check out the examples with pre-configured ActiveMQ docker image, for easy testing:
<dependency>
<groupId>io.helidon.messaging.aq</groupId>
<artifactId>helidon-messaging-aq</artifactId>
</dependency>PoolDataSource pds = PoolDataSourceFactory.getPoolDataSource();(1)
pds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
pds.setURL("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=192.168.0.123)(Port=1521))(CONNECT_DATA=(SID=XE)))");
pds.setUser("frank");
pds.setPassword("frank");
AqConnector seConn = AqConnector.builder()(2)
.dataSource("test-ds", pds)
.build();
Channel<String> toAq = Channel.<String>builder()(3)
.name("toAq")
.subscriberConfig(AqConnector.configBuilder()
.queue("example_queue_1")
.dataSource("test-ds")
.build())
.build();
Channel<String> fromAq = Channel.<String>builder()(4)
.name("fromAq")
.publisherConfig(AqConnector.configBuilder()
.queue("example_queue_1")
.dataSource("test-ds")
.build())
.build();
Messaging.builder()(5)
.connector(seConn)
.publisher(toAq, Multi.just("Hello", "world", "from", "Oracle", "DB!").map(Message::of))(6)
.listener(fromAq, s -> System.out.pritln("Message received: "+s))(7)
.build()
.start();-
Prepare Oracle UCP
-
Setup AQ connector and provide datasource with an identifier
test-ds -
Setup channel for sending messages to queue
example_queue_1with datasourcetest-ds -
Setup channel for receiving messages from queue
example_queue_1with datasourcetest-ds -
Register connector and channels
-
Add a publisher for several test messages to publish them to
example_queue_1immediately -
Subscribe callback for any message coming from
example_queue_1