diff --git a/bom/pom.xml b/bom/pom.xml
index 4c60da75d48..ff2d5a235eb 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -1137,6 +1137,11 @@
helidon-messaging-aq${helidon.version}
+
+ io.helidon.messaging.wls-jms
+ helidon-messaging-wls-jms
+ ${helidon.version}
+ io.helidon.microprofile.tests
diff --git a/docs/mp/reactivemessaging/weblogic.adoc b/docs/mp/reactivemessaging/weblogic.adoc
new file mode 100644
index 00000000000..710b2ceb43d
--- /dev/null
+++ b/docs/mp/reactivemessaging/weblogic.adoc
@@ -0,0 +1,215 @@
+///////////////////////////////////////////////////////////////////////////////
+
+ Copyright (c) 2022 Oracle and/or its affiliates.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+///////////////////////////////////////////////////////////////////////////////
+
+= Weblogic JMS Connector
+:toc:
+:toc-placement: preamble
+:description: Reactive Messaging support for Weblogic JMS in Helidon MP
+:keywords: helidon, mp, messaging, jms, weblogic, wls, thin
+:feature-name: Weblogic JMS connector
+:microprofile-bundle: false
+:rootdir: {docdir}/../..
+
+include::{rootdir}/includes/mp.adoc[]
+include::{rootdir}/includes/dependencies.adoc[]
+
+[source,xml]
+----
+
+ io.helidon.messaging.wls-jms
+ helidon-messaging-wls-jms
+
+----
+
+== Reactive Weblogic JMS Connector
+
+Connecting streams to Weblogic JMS with Reactive Messaging couldn't be easier.
+This connector extends Helidon JMS connector with special handling for Weblogic T3 thin client.
+
+Connecting to Weblogic JMS connection factories requires proprietary T3 thin client library which can be obtained from
+Weblogic installation.
+
+WARNING: Avoid placing `wlthint3client.jar` on Helidon classpath, client library location needs to be
+configured and loaded by Helidon messaging connector.
+
+=== Configuration
+
+Connector name: `helidon-weblogic-jms`
+
+.Attributes
+|===
+|`jms-factory` | JNDI name of the JMS factory configured in Weblogic
+|`url` | t3, t3s or http address of Weblogic server
+|`thin-jar` | Filepath to the Weblogic thin T3 client jar(wlthint3client.jar), can be usually found within Weblogic installation
+`WL_HOME/server/lib/wlthint3client.jar`
+|`principal` | Weblogic initial context principal(user)
+|`credential` | Weblogic initial context credential(password)
+|`type` | Possible values are: `queue`, `topic`. Default value is: `topic`
+|`destination` | Queue or topic name in WebLogic CDI Syntax(CDI stands for Create Destination Identifier)
+|`acknowledge-mode` |Possible values are: `AUTO_ACKNOWLEDGE`- session automatically acknowledges a client’s receipt of a message,
+`CLIENT_ACKNOWLEDGE` - receipt of a message is acknowledged only when `Message.ack()` is called manually,
+`DUPS_OK_ACKNOWLEDGE` - session lazily acknowledges the delivery of messages. Default value: `AUTO_ACKNOWLEDGE`
+|`transacted` | Indicates whether the session will use a local transaction. Default value: `false`
+|`message-selector` | JMS API message selector expression based on a subset of the SQL92.
+Expression can only access headers and properties, not the payload.
+|`client-id` | Client identifier for JMS connection.
+|`client-id` | Client identifier for JMS connection.
+|`durable` | True for creating durable consumer (only for topic). Default value: `false`
+|`subscriber-name` | Subscriber name for durable consumer used to identify subscription.
+|`non-local` | If true then any messages published to the topic using this session's connection,
+or any other connection with the same client identifier,
+will not be added to the durable subscription. Default value: `false`
+|`named-factory` | Select in case factory is injected as a named bean or configured with name.
+|`poll-timeout` | Timeout for polling for next message in every poll cycle in millis. Default value: `50`
+|`period-executions` | Period for executing poll cycles in millis. Default value: `100`
+|`session-group-id` | When multiple channels share same `session-group-id`, they share same JMS session and same JDBC connection as well.
+|`producer.unit-of-order` | All messages from same unit of order will be processed sequentially in the order they were created.
+|===
+
+Configuration is straight forward. Use JNDI for localizing and configuring of JMS ConnectionFactory
+from Weblogic. Notice the destination property which is used to define queue with
+https://docs.oracle.com/cd/E24329_01/web.1211/e24387/lookup.htm#JMSPG915[WebLogic CDI Syntax](CDI stands for Create Destination Identifier).
+
+[source,yaml]
+.Example config:
+----
+mp:
+ messaging:
+ connector:
+ helidon-weblogic-jms:
+ # JMS factory configured in Weblogic
+ jms-factory: jms/TestConnectionFactory
+ # Path to the WLS Thin T3 client jar
+ thin-jar: wlthint3client.jar
+ url: t3://localhost:7001
+ incoming:
+ from-wls:
+ connector: helidon-weblogic-jms
+ # WebLogic CDI Syntax(CDI stands for Create Destination Identifier)
+ destination: ./TestJMSModule!TestQueue
+ outgoing:
+ to-wls:
+ connector: helidon-weblogic-jms
+ destination: ./TestJMSModule!TestQueue
+----
+
+When configuring destination with WebLogic CDI, the following syntax needs to be applied:
+
+.Non-Distributed Destinations
+`jms-server-name/jms-module-name!destination-name`
+
+In our example we are replacing jms-server-name with `.` as we don’t have to look up the server we are connected to.
+
+.Uniform Distributed Destinations (UDDs)
+`jms-server-name/jms-module-name!jms-server-name@udd-name`
+
+Destination for UDD doesn't have `./` prefix, because distributed destinations can be served by multiple servers within a cluster.
+
+=== Consuming
+
+[source,java]
+.Consuming one by one unwrapped value:
+----
+@Incoming("from-wls")
+public void consumeWls(String msg) {
+ System.out.println("Weblogic says: " + msg);
+}
+----
+
+[source,java]
+.Consuming one by one, manual ack:
+----
+@Incoming("from-wls")
+@Acknowledgment(Acknowledgment.Strategy.MANUAL)
+public CompletionStage> consumewls(JmsMessage msg) {
+ System.out.println("Weblogic says: " + msg.getPayload());
+ return msg.ack();
+}
+----
+
+=== Producing
+
+[source,java]
+.Producing to Weblogic JMS:
+----
+@Outgoing("to-wls")
+public PublisherBuilder produceToWls() {
+ return ReactiveStreams.of("test1", "test2");
+}
+----
+
+[source,java]
+.Example of more advanced producing to Weblogic JMS:
+----
+@Outgoing("to-wls")
+public PublisherBuilder produceToJms() {
+ return ReactiveStreams.of("test1", "test2")
+ .map(s -> JmsMessage.builder(s)
+ .correlationId(UUID.randomUUID().toString())
+ .property("stringProp", "cool property")
+ .property("byteProp", 4)
+ .property("intProp", 5)
+ .onAck(() -> System.out.println("Acked!"))
+ .build());
+}
+----
+[source,java]
+.Example of even more advanced producing to Weblogic JMS with custom mapper:
+----
+@Outgoing("to-wls")
+public PublisherBuilder produceToJms() {
+ return ReactiveStreams.of("test1", "test2")
+ .map(s -> JmsMessage.builder(s)
+ .customMapper((p, session) -> {
+ TextMessage textMessage = session.createTextMessage(p);
+ textMessage.setStringProperty("custom-mapped-property", "XXX" + p);
+ return textMessage;
+ })
+ .build()
+ );
+}
+----
+
+=== Secured t3 over SSL(t3s)
+For initiating SSL secured t3 connection, trust keystore with WLS public certificate is needed.
+Standard WLS installation has pre-configured Demo trust store: `WL_HOME/server/lib/DemoTrust.jks`,
+we can store it locally for connecting WLS over t3s.
+
+[source,yaml]
+.Example config:
+----
+mp:
+ messaging:
+ connector:
+ helidon-weblogic-jms:
+ jms-factory: jms/TestConnectionFactory
+ thin-jar: wlthint3client.jar
+ # Notice t3s protocol is used
+ url: t3s://localhost:7002
+----
+
+Helidon application needs to be aware about our WLS SSL public certificate.
+
+[source,bash]
+.Running example with WLS truststore
+----
+java --add-opens=java.base/java.io=ALL-UNNAMED \
+-Djavax.net.ssl.trustStore=DemoTrust.jks \
+-Djavax.net.ssl.trustStorePassword=DemoTrustKeyStorePassPhrase \
+-jar ./target/helidon-wls-jms-demo.jar
+----
\ No newline at end of file
diff --git a/docs/sitegen.yaml b/docs/sitegen.yaml
index f60c6c5367b..447b7f53726 100644
--- a/docs/sitegen.yaml
+++ b/docs/sitegen.yaml
@@ -409,6 +409,7 @@ backend:
- "kafka.adoc"
- "jms.adoc"
- "aq.adoc"
+ - "weblogic.adoc"
- "mock.adoc"
- type: "PAGE"
title: "REST Client"
diff --git a/examples/messaging/pom.xml b/examples/messaging/pom.xml
index 297f5f0521b..bc079601fd7 100644
--- a/examples/messaging/pom.xml
+++ b/examples/messaging/pom.xml
@@ -39,5 +39,6 @@
jms-websocket-mpjms-websocket-seoracle-aq-websocket-mp
+ weblogic-jms-mp
diff --git a/examples/messaging/weblogic-jms-mp/README.md b/examples/messaging/weblogic-jms-mp/README.md
new file mode 100644
index 00000000000..56414c125ef
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/README.md
@@ -0,0 +1,24 @@
+# Helidon Messaging with Oracle Weblogic Example
+
+## Prerequisites
+* JDK 17+
+* Maven
+* Docker
+* Account at https://container-registry.oracle.com/ with accepted Oracle Standard Terms and Restrictions for Weblogic.
+
+## Run Weblogic in docker
+1. You will need to do a docker login to Oracle container registry with account which previously
+ accepted Oracle Standard Terms and Restrictions for Weblogic:
+ `docker login container-registry.oracle.com`
+2. Run `bash buildAndRunWeblogic.sh` to build and run example Weblogic container.
+ * After example JMS resources are deployed, Weblogic console should be available at http://localhost:7001/console with `admin`/`Welcome1`
+3. To obtain wlthint3client.jar necessary for connecting to Weblogic execute
+ `bash extractThinClientLib.sh`, file will be copied to `./weblogic` folder.
+
+## Build & Run
+To run Helidon with thin client, flag `--add-opens=java.base/java.io=ALL-UNNAMED` is needed to
+open java.base module to thin client internals.
+1. `mvn clean package`
+2. `java --add-opens=java.base/java.io=ALL-UNNAMED --enable-preview -jar ./target/weblogic-jms-mp.jar`
+3. Visit http://localhost:8080 and try to send and receive messages over Weblogic JMS queue.
+
diff --git a/examples/messaging/weblogic-jms-mp/buildAndRunWeblogic.sh b/examples/messaging/weblogic-jms-mp/buildAndRunWeblogic.sh
new file mode 100644
index 00000000000..11317c060af
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/buildAndRunWeblogic.sh
@@ -0,0 +1,53 @@
+#!/bin/bash -e
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cd ./weblogic
+
+# Attempt Oracle container registry login.
+# You need to accept the licence agreement for Weblogic Server at https://container-registry.oracle.com/
+# Search for weblogic and accept the Oracle Standard Terms and Restrictions
+docker login container-registry.oracle.com
+
+docker build -t wls-admin .
+
+docker run --rm -d \
+ -p 7001:7001 \
+ -p 7002:7002 \
+ --name wls-admin \
+ --hostname wls-admin \
+ wls-admin
+
+printf "Waiting for WLS to start ."
+while true;
+do
+ if docker logs wls-admin | grep -q "Server state changed to RUNNING"; then
+ break;
+ fi
+ printf "."
+ sleep 5
+done
+printf " [READY]\n"
+
+echo Deploying example JMS queues
+docker exec wls-admin \
+/bin/bash \
+/u01/oracle/wlserver/common/bin/wlst.sh \
+/u01/oracle/setupTestJMSQueue.py;
+
+echo Example JMS queues deployed!
+echo Console avaiable at http://localhost:7001/console with admin/Welcome1
+echo 'Stop Weblogic server with "docker stop wls-admin"'
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/extractThinClientLib.sh b/examples/messaging/weblogic-jms-mp/extractThinClientLib.sh
new file mode 100644
index 00000000000..b5d6ff1b9ec
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/extractThinClientLib.sh
@@ -0,0 +1,22 @@
+#!/bin/bash -e
+#
+# Copyright (c) 2018, 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+# Copy wlthint3client.jar from docker container
+docker cp wls-admin:/u01/oracle/wlserver/server/lib/wlthint3client.jar ./weblogic/wlthint3client.jar
+# Copy DemoTrust.jks from docker container(needed if you want to try t3s protocol)
+docker cp wls-admin:/u01/oracle/wlserver/server/lib/DemoTrust.jks ./weblogic/DemoTrust.jks
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/pom.xml b/examples/messaging/weblogic-jms-mp/pom.xml
new file mode 100644
index 00000000000..7b50359c23c
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/pom.xml
@@ -0,0 +1,88 @@
+
+
+
+ 4.0.0
+
+ io.helidon.applications
+ helidon-mp
+ 4.0.0-SNAPSHOT
+ ../../../applications/mp/pom.xml
+
+ io.helidon.examples.messaging.wls
+ weblogic-jms-mp
+ 1.0-SNAPSHOT
+ weblogic-jms-mp
+
+
+
+ io.helidon.microprofile.bundles
+ helidon-microprofile
+
+
+
+ io.helidon.microprofile.messaging
+ helidon-microprofile-messaging
+
+
+ io.helidon.messaging.wls-jms
+ helidon-messaging-wls-jms
+
+
+
+ org.glassfish.jersey.media
+ jersey-media-sse
+
+
+
+ org.jboss
+ jandex
+ runtime
+ true
+
+
+ jakarta.activation
+ jakarta.activation-api
+ runtime
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-libs
+
+
+
+
+ org.jboss.jandex
+ jandex-maven-plugin
+
+
+ make-index
+
+
+
+
+
+
diff --git a/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/FrankResource.java b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/FrankResource.java
new file mode 100644
index 00000000000..1e0062b14d7
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/FrankResource.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2022 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.helidon.examples.messaging.mp;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import io.helidon.messaging.connectors.jms.JmsMessage;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.POST;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.sse.Sse;
+import jakarta.ws.rs.sse.SseBroadcaster;
+import jakarta.ws.rs.sse.SseEventSink;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.glassfish.jersey.media.sse.OutboundEvent;
+
+/**
+ * SSE Jax-Rs resource for message publishing and consuming.
+ */
+@Path("/frank")
+@ApplicationScoped
+public class FrankResource {
+
+ @Inject
+ @Channel("to-wls")
+ private Emitter emitter;
+ private SseBroadcaster sseBroadcaster;
+
+ /**
+ * Consuming JMS messages from Weblogic and sending them to the client over SSE.
+ *
+ * @param msg dequeued message
+ * @return completion stage marking end of the processing
+ */
+ @Incoming("from-wls")
+ public CompletionStage receive(JmsMessage msg) {
+ if (sseBroadcaster == null) {
+ System.out.println("No SSE client subscribed yet: " + msg.getPayload());
+ return CompletableFuture.completedStage(null);
+ }
+ sseBroadcaster.broadcast(new OutboundEvent.Builder().data(msg.getPayload()).build());
+ return CompletableFuture.completedStage(null);
+ }
+
+ /**
+ * Send message to Weblogic JMS queue.
+ *
+ * @param msg message to be sent
+ */
+ @POST
+ @Path("/send/{msg}")
+ public void send(@PathParam("msg") String msg) {
+ emitter.send(msg);
+ }
+
+ /**
+ * Register SSE client to listen for messages coming from Weblogic JMS.
+ *
+ * @param eventSink client sink
+ * @param sse SSE context
+ */
+ @GET
+ @Path("sse")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public void listenToEvents(@Context SseEventSink eventSink, @Context Sse sse) {
+ if (sseBroadcaster == null) {
+ sseBroadcaster = sse.newBroadcaster();
+ }
+ sseBroadcaster.register(eventSink);
+ }
+}
diff --git a/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/package-info.java b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/package-info.java
new file mode 100644
index 00000000000..372cec04320
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2022 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Helidon MP Reactive Messaging with Weblogic JMS.
+ */
+package io.helidon.examples.messaging.mp;
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/META-INF/beans.xml b/examples/messaging/weblogic-jms-mp/src/main/resources/META-INF/beans.xml
new file mode 100644
index 00000000000..80727f9c7fd
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+
+
+
+
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/favicon.ico b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/favicon.ico
new file mode 100644
index 00000000000..d91659fdb53
Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/favicon.ico differ
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-1.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-1.png
new file mode 100644
index 00000000000..bbba0aef8a6
Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-1.png differ
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-2.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-2.png
new file mode 100644
index 00000000000..0b1096b07c1
Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-2.png differ
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/cloud.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/cloud.png
new file mode 100644
index 00000000000..3e04833c08b
Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/cloud.png differ
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/frank.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/frank.png
new file mode 100644
index 00000000000..51a13d8db8b
Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/frank.png differ
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/index.html b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/index.html
new file mode 100644
index 00000000000..0000f2e203e
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/index.html
@@ -0,0 +1,101 @@
+
+
+
+
+
+
+ Helidon Reactive Messaging
+
+
+
+
+
+
+
+
+
+
+
Send
+
+
+
+
+
REST call /frank/send/{msg}
+
+
+
SSE messages received
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/main.css b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/main.css
new file mode 100644
index 00000000000..496dde4fe6c
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/main.css
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2022 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#root {
+ background-color: #36ABF2;
+ font-family: Roboto,sans-serif;
+ color: #fff;
+ position: absolute;
+ overflow-x: hidden;
+ -ms-overflow-style: none; /* Internet Explorer 10+ */
+ scrollbar-width: none; /* Firefox */
+ top: 0;
+ left: 0;
+ width: 100%;
+ height: 100%;
+}
+#root::-webkit-scrollbar {
+ display: none; /* Safari and Chrome */
+}
+
+#helidon {
+ width: 509px;
+ height: 273px;
+ position: relative;
+ left: -509px;
+ z-index: 4;
+ background: url('img/frank.png');
+}
+
+#rest-tip {
+ position: relative;
+ top: -80px;
+ left: 160px;
+}
+
+#rest-tip-arrow {
+ width: 205px;
+ height: 304px;
+ z-index: 4;
+ top: -20px;
+ background: url('img/arrow-1.png');
+}
+#rest-tip-label {
+ position: absolute;
+ white-space: nowrap;
+ font-size: 18px;
+ font-weight: bold;
+ z-index: 4;
+ left: -60px;
+}
+
+#sse-tip {
+ position: absolute;
+ overflow: hidden;
+ display: flex;
+ width: auto;
+ height: auto;
+ top: 5%;
+ right: 10%;
+ z-index: 0;
+}
+
+#sse-tip-arrow {
+ position: relative;
+ top: -30px;
+ width: 296px;
+ height: 262px;
+ z-index: 4;
+ background: url('img/arrow-2.png');
+}
+#sse-tip-label {
+ position: relative;
+ white-space: nowrap;
+ font-size: 18px;
+ font-weight: bold;
+ z-index: 4;
+}
+
+#producer {
+ float: left;
+ position: relative;
+ width: 300px;
+ height: 100%;
+ margin: 50px;
+ padding: 10px;
+ z-index: 99;
+}
+
+#msgBox {
+ position: absolute;
+ width: 300px;
+ top: 25%;
+ right: 3%;
+ height: 100%;
+ margin: 50px;
+ padding: 10px;
+ z-index: 20;
+}
+
+#input {
+ width: 210px;
+ height: 22px;
+ top: 58px;
+ left: 30px;
+ background-color: white;
+ border-radius: 10px;
+ border-style: solid;
+ border-color: white;
+ position: absolute;
+ z-index: 10;
+}
+
+#inputCloud {
+ position: relative;
+ width: 310px;
+ height: 150px;
+ background: url('img/cloud.png');
+}
+
+#msg {
+ background-color: #D2EBFC;
+ color: #1A9BF4;
+ border-radius: 10px;
+ width: 300px;
+ height: 50px;
+ margin: 5px;
+ display: flex;
+ padding-left: 10px;
+ justify-content: center;
+ align-items: center;
+ z-index: 99;
+}
+
+#submit {
+ font-weight: bold;
+ background-color: aqua;
+ color: #1A9BF4;
+ border-radius: 12px;
+ width: 100px;
+ height: 30px;
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ margin: 5px;
+ cursor: pointer;
+}
+
+#snippet {
+ position: absolute;
+ top: 15%;
+ left: 30%;
+ width: 40%;
+ z-index: 5;
+}
+
+.hljs {
+ border-radius: 10px;
+ font-size: 12px;
+}
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/application.yaml b/examples/messaging/weblogic-jms-mp/src/main/resources/application.yaml
new file mode 100644
index 00000000000..28a147e57a5
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/resources/application.yaml
@@ -0,0 +1,44 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+server:
+ port: 8080
+ host: 0.0.0.0
+ static:
+ classpath:
+ location: /WEB
+ welcome: index.html
+
+mp:
+ messaging:
+ connector:
+ helidon-weblogic-jms:
+ # JMS factory configured in Weblogic
+ jms-factory: jms/TestConnectionFactory
+ # Path to the WLS Thin T3 client jar(extract it from docker container with extractThinClientLib.sh)
+ thin-jar: weblogic/wlthint3client.jar
+ url: "t3://localhost:7001"
+ producer.unit-of-order: kec1
+ incoming:
+ from-wls:
+ connector: helidon-weblogic-jms
+ # WebLogic CDI Syntax(CDI stands for Create Destination Identifier)
+ destination: ./TestJMSModule!TestQueue
+ outgoing:
+ to-wls:
+ connector: helidon-weblogic-jms
+ # Same queue is used for simplifying test case
+ destination: ./TestJMSModule!TestQueue
diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/logging.properties b/examples/messaging/weblogic-jms-mp/src/main/resources/logging.properties
new file mode 100644
index 00000000000..1ac57eb5b92
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/src/main/resources/logging.properties
@@ -0,0 +1,30 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Example Logging Configuration File
+# For more information see $JAVA_HOME/jre/lib/logging.properties
+
+# Send messages to the console
+handlers=io.helidon.logging.jul.HelidonConsoleHandler
+
+# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
+java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
+
+# Global logging level. Can be overridden by specific loggers
+.level=INFO
+
+# Component specific log levels
+#io.helidon.level=INFO
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/Dockerfile b/examples/messaging/weblogic-jms-mp/weblogic/Dockerfile
new file mode 100644
index 00000000000..6e79458ebd5
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/Dockerfile
@@ -0,0 +1,54 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ORACLE DOCKERFILES PROJECT
+# --------------------------
+# This docker file is customized, originaly taken from https://github.com/oracle/docker-images
+# and extends the Oracle WebLogic image by creating a sample domain.
+#
+# Base image is available at https://container-registry.oracle.com/
+#
+FROM container-registry.oracle.com/middleware/weblogic:14.1.1.0-dev-11
+
+ENV ORACLE_HOME=/u01/oracle \
+ USER_MEM_ARGS="-Djava.security.egd=file:/dev/./urandom" \
+ SCRIPT_FILE=/u01/oracle/createAndStartEmptyDomain.sh \
+ HEALTH_SCRIPT_FILE=/u01/oracle/get_healthcheck_url.sh \
+ PATH=$PATH:${JAVA_HOME}/bin:/u01/oracle/oracle_common/common/bin:/u01/oracle/wlserver/common/bin
+
+ENV DOMAIN_NAME="${DOMAIN_NAME:-base_domain}" \
+ ADMIN_LISTEN_PORT="${ADMIN_LISTEN_PORT:-7001}" \
+ ADMIN_NAME="${ADMIN_NAME:-AdminServer}" \
+ DEBUG_FLAG=true \
+ PRODUCTION_MODE=dev \
+ ADMINISTRATION_PORT_ENABLED="${ADMINISTRATION_PORT_ENABLED:-true}" \
+ ADMINISTRATION_PORT="${ADMINISTRATION_PORT:-9002}"
+
+COPY container-scripts/createAndStartEmptyDomain.sh container-scripts/get_healthcheck_url.sh /u01/oracle/
+COPY container-scripts/create-wls-domain.py container-scripts/setupTestJMSQueue.py /u01/oracle/
+COPY properties/domain.properties /u01/oracle/properties/
+
+USER root
+
+RUN chmod +xr $SCRIPT_FILE $HEALTH_SCRIPT_FILE && \
+ chown oracle:root $SCRIPT_FILE /u01/oracle/create-wls-domain.py $HEALTH_SCRIPT_FILE
+
+USER oracle
+
+HEALTHCHECK --start-period=10s --timeout=30s --retries=3 CMD curl -k -s --fail `$HEALTH_SCRIPT_FILE` || exit 1
+WORKDIR ${ORACLE_HOME}
+
+CMD ["/u01/oracle/createAndStartEmptyDomain.sh"]
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/create-wls-domain.py b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/create-wls-domain.py
new file mode 100644
index 00000000000..e24167e1fb1
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/create-wls-domain.py
@@ -0,0 +1,104 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# WebLogic on Docker Default Domain
+#
+# Domain, as defined in DOMAIN_NAME, will be created in this script. Name defaults to 'base_domain'.
+#
+# Since : October, 2014
+# Author: monica.riccelli@oracle.com
+# ==============================================
+domain_name = os.environ.get("DOMAIN_NAME", "base_domain")
+admin_name = os.environ.get("ADMIN_NAME", "AdminServer")
+admin_listen_port = int(os.environ.get("ADMIN_LISTEN_PORT", "7001"))
+domain_path = '/u01/oracle/user_projects/domains/%s' % domain_name
+production_mode = os.environ.get("PRODUCTION_MODE", "prod")
+administration_port_enabled = os.environ.get("ADMINISTRATION_PORT_ENABLED", "true")
+administration_port = int(os.environ.get("ADMINISTRATION_PORT", "9002"))
+
+print('domain_name : [%s]' % domain_name);
+print('admin_listen_port : [%s]' % admin_listen_port);
+print('domain_path : [%s]' % domain_path);
+print('production_mode : [%s]' % production_mode);
+print('admin name : [%s]' % admin_name);
+print('administration_port_enabled : [%s]' % administration_port_enabled);
+print('administration_port : [%s]' % administration_port);
+
+# Open default domain template
+# ============================
+readTemplate("/u01/oracle/wlserver/common/templates/wls/wls.jar")
+
+set('Name', domain_name)
+setOption('DomainName', domain_name)
+
+# Set Administration Port
+# =======================
+if administration_port_enabled != "false":
+ set('AdministrationPort', administration_port)
+ set('AdministrationPortEnabled', 'false')
+
+# Disable Admin Console
+# --------------------
+# cmo.setConsoleEnabled(false)
+
+# Configure the Administration Server and SSL port.
+# =================================================
+cd('/Servers/AdminServer')
+set('Name', admin_name)
+set('ListenAddress', '')
+set('ListenPort', admin_listen_port)
+if administration_port_enabled != "false":
+ create(admin_name, 'SSL')
+ cd('SSL/' + admin_name)
+ set('Enabled', 'True')
+
+# Define the user password for weblogic
+# =====================================
+cd(('/Security/%s/User/weblogic') % domain_name)
+cmo.setName(username)
+cmo.setPassword(password)
+
+# Write the domain and close the domain template
+# ==============================================
+setOption('OverwriteDomain', 'true')
+setOption('ServerStartMode',production_mode)
+
+# Create Node Manager
+# ===================
+#cd('/NMProperties')
+#set('ListenAddress','')
+#set('ListenPort',5556)
+#set('CrashRecoveryEnabled', 'true')
+#set('NativeVersionEnabled', 'true')
+#set('StartScriptEnabled', 'false')
+#set('SecureListener', 'false')
+#set('LogLevel', 'FINEST')
+
+# Set the Node Manager user name and password
+# ===========================================
+#cd('/SecurityConfiguration/%s' % domain_name)
+#set('NodeManagerUsername', username)
+#set('NodeManagerPasswordEncrypted', password)
+
+# Write Domain
+# ============
+writeDomain(domain_path)
+closeTemplate()
+
+# Exit WLST
+# =========
+exit()
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/createAndStartEmptyDomain.sh b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/createAndStartEmptyDomain.sh
new file mode 100644
index 00000000000..1d1a3e4eaff
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/createAndStartEmptyDomain.sh
@@ -0,0 +1,87 @@
+#!/bin/bash
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# If AdminServer.log does not exists, container is starting for 1st time
+# So it should start NM and also associate with AdminServer
+# Otherwise, only start NM (container restarted)
+########### SIGTERM handler ############
+function _term() {
+ echo "Stopping container."
+ echo "SIGTERM received, shutting down the server!"
+ ${DOMAIN_HOME}/bin/stopWebLogic.sh
+}
+
+########### SIGKILL handler ############
+function _kill() {
+ echo "SIGKILL received, shutting down the server!"
+ kill -9 $childPID
+}
+
+# Set SIGTERM handler
+trap _term SIGTERM
+
+# Set SIGKILL handler
+trap _kill SIGKILL
+
+#Define DOMAIN_HOME
+export DOMAIN_HOME=/u01/oracle/user_projects/domains/$DOMAIN_NAME
+echo "Domain Home is: " $DOMAIN_HOME
+
+mkdir -p $ORACLE_HOME/properties
+# Create Domain only if 1st execution
+if [ ! -e ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log ]; then
+ echo "Create Domain"
+ PROPERTIES_FILE=/u01/oracle/properties/domain.properties
+ if [ ! -e "$PROPERTIES_FILE" ]; then
+ echo "A properties file with the username and password needs to be supplied."
+ exit
+ fi
+
+ # Get Username
+ USER=`awk '{print $1}' $PROPERTIES_FILE | grep username | cut -d "=" -f2`
+ if [ -z "$USER" ]; then
+ echo "The domain username is blank. The Admin username must be set in the properties file."
+ exit
+ fi
+ # Get Password
+ PASS=`awk '{print $1}' $PROPERTIES_FILE | grep password | cut -d "=" -f2`
+ if [ -z "$PASS" ]; then
+ echo "The domain password is blank. The Admin password must be set in the properties file."
+ exit
+ fi
+
+ # Create an empty domain
+ wlst.sh -skipWLSModuleScanning -loadProperties $PROPERTIES_FILE /u01/oracle/create-wls-domain.py
+ mkdir -p ${DOMAIN_HOME}/servers/${ADMIN_NAME}/security/
+ chmod -R g+w ${DOMAIN_HOME}
+ echo "username=${USER}" >> $DOMAIN_HOME/servers/${ADMIN_NAME}/security/boot.properties
+ echo "password=${PASS}" >> $DOMAIN_HOME/servers/${ADMIN_NAME}/security/boot.properties
+ ${DOMAIN_HOME}/bin/setDomainEnv.sh
+ # Setup JMS examples
+# wlst.sh -skipWLSModuleScanning -loadProperties $PROPERTIES_FILE /u01/oracle/setupTestJMSQueue.py
+fi
+
+# Start Admin Server and tail the logs
+${DOMAIN_HOME}/startWebLogic.sh
+if [ -e ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log ]; then
+ echo "${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log"
+fi
+touch ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log
+tail -f ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log
+
+childPID=$!
+wait $childPID
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/get_healthcheck_url.sh b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/get_healthcheck_url.sh
new file mode 100644
index 00000000000..5eb3ded88e4
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/get_healthcheck_url.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ "$ADMINISTRATION_PORT_ENABLED" = "true" ] ; then
+ echo "https://{localhost:$ADMINISTRATION_PORT}/weblogic/ready" ;
+else
+ echo "http://{localhost:$ADMIN_LISTEN_PORT}/weblogic/ready" ;
+fi
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/setupTestJMSQueue.py b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/setupTestJMSQueue.py
new file mode 100644
index 00000000000..3269b782ae0
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/setupTestJMSQueue.py
@@ -0,0 +1,115 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os.path
+import sys
+
+System.setProperty("weblogic.security.SSL.ignoreHostnameVerification", "true")
+
+connect("admin","Welcome1","t3://localhost:7001")
+adm_name=get('AdminServerName')
+sub_deployment_name="TestJMSSubdeployment"
+jms_module_name="TestJMSModule"
+queue_name="TestQueue"
+factory_name="TestConnectionFactory"
+jms_server_name="TestJMSServer"
+
+
+def createJMSServer(adm_name, jms_server_name):
+ cd('/JMSServers')
+ if (len(ls(returnMap='true')) == 0):
+ print 'No JMS Server found, creating ' + jms_server_name
+ cd('/')
+ cmo.createJMSServer(jms_server_name)
+ cd('/JMSServers/'+jms_server_name)
+ cmo.addTarget(getMBean("/Servers/" + adm_name))
+
+
+def createJMSModule(jms_module_name, adm_name, sub_deployment_name):
+ print "Creating JMS module " + jms_module_name
+ cd('/JMSServers')
+ jms_servers=ls(returnMap='true')
+ cd('/')
+ module = create(jms_module_name, "JMSSystemResource")
+ module.addTarget(getMBean("Servers/"+adm_name))
+ cd('/SystemResources/'+jms_module_name)
+ module.createSubDeployment(sub_deployment_name)
+ cd('/SystemResources/'+jms_module_name+'/SubDeployments/'+sub_deployment_name)
+
+ list=[]
+ for i in jms_servers:
+ list.append(ObjectName(str('com.bea:Name='+i+',Type=JMSServer')))
+ set('Targets',jarray.array(list, ObjectName))
+
+def getJMSModulePath(jms_module_name):
+ jms_module_path = "/JMSSystemResources/"+jms_module_name+"/JMSResource/"+jms_module_name
+ return jms_module_path
+
+def createJMSQueue(jms_module_name,jms_queue_name):
+ print "Creating JMS queue " + jms_queue_name
+ jms_module_path = getJMSModulePath(jms_module_name)
+ cd(jms_module_path)
+ cmo.createQueue(jms_queue_name)
+ cd(jms_module_path+'/Queues/'+jms_queue_name)
+ cmo.setJNDIName("jms/" + jms_queue_name)
+ cmo.setSubDeploymentName(sub_deployment_name)
+
+def createDistributedJMSQueue(jms_module_name,jms_queue_name):
+ print "Creating distributed JMS queue " + jms_queue_name
+ jms_module_path = getJMSModulePath(jms_module_name)
+ cd(jms_module_path)
+ cmo.createDistributedQueue(jms_queue_name)
+ cd(jms_module_path+'/DistributedQueues/'+jms_queue_name)
+ cmo.setJNDIName("jms/" + jms_queue_name)
+
+def addMemberQueue(udd_name,queue_name):
+ jms_module_path = getJMSModulePath(jms_module_name)
+ cd(jms_module_path+'/DistributedQueues/'+udd_name)
+ cmo.setLoadBalancingPolicy('Round-Robin')
+ cmo.createDistributedQueueMember(queue_name)
+
+def createJMSFactory(jms_module_name,jms_fact_name):
+ print "Creating JMS connection factory " + jms_fact_name
+ jms_module_path = getJMSModulePath(jms_module_name)
+ cd(jms_module_path)
+ cmo.createConnectionFactory(jms_fact_name)
+ cd(jms_module_path+'/ConnectionFactories/'+jms_fact_name)
+ cmo.setJNDIName("jms/" + jms_fact_name)
+ cmo.setSubDeploymentName(sub_deployment_name)
+
+
+
+edit()
+startEdit()
+
+print "Server name: "+adm_name
+
+createJMSServer(adm_name,jms_server_name)
+createJMSModule(jms_module_name,adm_name,sub_deployment_name)
+createJMSFactory(jms_module_name,factory_name)
+createJMSQueue(jms_module_name,queue_name)
+
+### Unified Distributed Destinations(UDD) example
+createDistributedJMSQueue(jms_module_name,"udd_queue")
+# Normally member queues would be in different sub-deployments
+createJMSQueue(jms_module_name,"ms1@udd_queue")
+createJMSQueue(jms_module_name,"ms2@udd_queue")
+addMemberQueue("udd_queue", "ms1@udd_queue")
+addMemberQueue("udd_queue", "ms2@udd_queue")
+
+save()
+activate(block="true")
+disconnect()
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/properties/domain.properties b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain.properties
new file mode 100644
index 00000000000..6e9a5fc4b19
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain.properties
@@ -0,0 +1,35 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Env properties inherited from base image
+DOMAIN_NAME=myDomain
+ADMIN_LISTEN_PORT=7001
+ADMIN_NAME=myadmin
+PRODUCTION_MODE=dev
+DEBUG_FLAG=true
+ADMINISTRATION_PORT_ENABLED=false
+ADMINISTRATION_PORT=9002
+# Env properties for this image
+ADMIN_HOST=AdminContainer
+MANAGED_SERVER_PORT=8001
+MANAGED_SERVER_NAME_BASE=MS
+CONFIGURED_MANAGED_SERVER_COUNT=2
+PRODUCTION_MODE_ENABLED=true
+CLUSTER_NAME=cluster1
+CLUSTER_TYPE=DYNAMIC
+DOMAIN_HOST_VOLUME=/Users/host/temp
+username=admin
+password=Welcome1
\ No newline at end of file
diff --git a/examples/messaging/weblogic-jms-mp/weblogic/properties/domain_security.properties b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain_security.properties
new file mode 100644
index 00000000000..fcfb1d90fff
--- /dev/null
+++ b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain_security.properties
@@ -0,0 +1,18 @@
+#
+# Copyright (c) 2022 Oracle and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+username=admin
+password=Welcome1
\ No newline at end of file
diff --git a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java
index 5eb75fa34e4..131149ec0ad 100644
--- a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java
+++ b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
import io.helidon.config.Config;
import io.helidon.config.ConfigValue;
import io.helidon.messaging.MessagingException;
+import io.helidon.messaging.NackHandler;
import io.helidon.messaging.connectors.jms.ConnectionContext;
import io.helidon.messaging.connectors.jms.JmsConnector;
import io.helidon.messaging.connectors.jms.JmsMessage;
@@ -167,15 +168,19 @@ private AQjmsConnectionFactory createAqFactory(Config c) throws javax.jms.JMSExc
@Override
- protected JmsMessage> createMessage(jakarta.jms.Message message,
+ protected JmsMessage> createMessage(NackHandler nackHandler,
+ jakarta.jms.Message message,
Executor executor,
SessionMetadata sessionMetadata) {
- return new AqMessageImpl<>(super.createMessage(message, executor, sessionMetadata), sessionMetadata);
+ return new AqMessageImpl<>(
+ super.createMessage(nackHandler, message, executor, sessionMetadata),
+ sessionMetadata);
}
@Override
protected BiConsumer, JMSException> sendingErrorHandler(Config config) {
return (m, e) -> {
+ m.nack(e);
throw new MessagingException("Error during sending Oracle AQ JMS message.", e);
};
}
diff --git a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java
index 1240e700762..aafe1b3971c 100644
--- a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java
+++ b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ class AqMessageImpl implements AqMessage {
if (jakartaSession == null) {
this.session = null;
} else {
- this.session = ((JakartaSession) jakartaSession).unwrap(AQjmsSession.class);
+ this.session = ((JakartaSession) jakartaSession).unwrap();
}
}
diff --git a/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java b/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java
index 7533c8fddc4..00d9aa2f667 100644
--- a/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java
+++ b/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@ void ackPropagationTest() throws InterruptedException, JMSException {
}).when(mockedMessage).acknowledge();
AqConnectorImpl aqConnector = new AqConnectorImpl(Map.of(), null, null);
- JmsMessage> jmsMessage = aqConnector.createMessage(mockedMessage, null, sessionMetadata);
+ JmsMessage> jmsMessage = aqConnector.createMessage(null, mockedMessage, null, sessionMetadata);
AqMessage aqMessage = new AqMessageImpl<>(jmsMessage, sessionMetadata);
aqMessage.ack();
assertThat("Ack not propagated to JmsMessage",
diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java
index c41dd204b7b..6dd6d1550e6 100644
--- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java
+++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2021, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,14 +20,20 @@
/**
* Exposes Jakarta API, delegates to javax API.
*/
-class JakartaDestination implements Destination {
+class JakartaDestination implements Destination, JakartaWrapper {
private final T delegate;
JakartaDestination(T delegate) {
this.delegate = delegate;
}
- T unwrap() {
+ @Override
+ public T unwrap() {
return delegate;
}
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
}
diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java
index 082008514bc..96e4653fb6a 100644
--- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java
+++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java
@@ -15,6 +15,10 @@
*/
package io.helidon.messaging.connectors.jms.shim;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
import jakarta.jms.BytesMessage;
import jakarta.jms.CompletionListener;
import jakarta.jms.Connection;
@@ -57,6 +61,7 @@ private JakartaJms() {
* @return shimmed jakarta namespace instance
*/
public static BytesMessage create(javax.jms.BytesMessage delegate) {
+ if (delegate == null) return null;
return new JakartaByteMessage(delegate);
}
/**
@@ -65,6 +70,7 @@ public static BytesMessage create(javax.jms.BytesMessage delegate) {
* @return shimmed jakarta namespace instance
*/
public static CompletionListener create(javax.jms.CompletionListener delegate) {
+ if (delegate == null) return null;
return new JakartaCompletionListener(delegate);
}
/**
@@ -73,6 +79,7 @@ public static CompletionListener create(javax.jms.CompletionListener delegate) {
* @return shimmed jakarta namespace instance
*/
public static Connection create(javax.jms.Connection delegate) {
+ if (delegate == null) return null;
return new JakartaConnection(delegate);
}
/**
@@ -81,6 +88,7 @@ public static Connection create(javax.jms.Connection delegate) {
* @return shimmed jakarta namespace instance
*/
public static ConnectionConsumer create(javax.jms.ConnectionConsumer delegate) {
+ if (delegate == null) return null;
return new JakartaConnectionConsumer(delegate);
}
/**
@@ -89,6 +97,7 @@ public static ConnectionConsumer create(javax.jms.ConnectionConsumer delegate) {
* @return shimmed jakarta namespace instance
*/
public static ConnectionFactory create(javax.jms.ConnectionFactory delegate) {
+ if (delegate == null) return null;
return new JakartaConnectionFactory(delegate);
}
/**
@@ -97,6 +106,7 @@ public static ConnectionFactory create(javax.jms.ConnectionFactory delegate) {
* @return shimmed jakarta namespace instance
*/
public static ConnectionMetaData create(javax.jms.ConnectionMetaData delegate) {
+ if (delegate == null) return null;
return new JakartaConnectionMetaData(delegate);
}
/**
@@ -105,6 +115,7 @@ public static ConnectionMetaData create(javax.jms.ConnectionMetaData delegate) {
* @return shimmed jakarta namespace instance
*/
public static JMSConsumer create(javax.jms.JMSConsumer delegate) {
+ if (delegate == null) return null;
return new JakartaConsumer(delegate);
}
/**
@@ -113,6 +124,7 @@ public static JMSConsumer create(javax.jms.JMSConsumer delegate) {
* @return shimmed jakarta namespace instance
*/
public static JMSContext create(javax.jms.JMSContext delegate) {
+ if (delegate == null) return null;
return new JakartaContext(delegate);
}
/**
@@ -121,6 +133,7 @@ public static JMSContext create(javax.jms.JMSContext delegate) {
* @return shimmed jakarta namespace instance
*/
public static Destination create(javax.jms.Destination delegate) {
+ if (delegate == null) return null;
return new JakartaDestination<>(delegate);
}
/**
@@ -129,6 +142,7 @@ public static Destination create(javax.jms.Destination delegate) {
* @return shimmed jakarta namespace instance
*/
public static ExceptionListener create(javax.jms.ExceptionListener delegate) {
+ if (delegate == null) return null;
return new JakartaExceptionListener(delegate);
}
/**
@@ -137,8 +151,32 @@ public static ExceptionListener create(javax.jms.ExceptionListener delegate) {
* @return shimmed jakarta namespace instance
*/
public static MapMessage create(javax.jms.MapMessage delegate) {
+ if (delegate == null) return null;
return new JakartaMapMessage(delegate);
}
+
+ /**
+ * Convenience method for shimming various javax JMS classes.
+ *
+ * @param obj to be shimmed or just typed
+ * @param expectedType expected type to shim to
+ * @return typed or shimmed object
+ * @param expected type to shim to
+ */
+ public static T resolve(Object obj, Class expectedType) {
+ if (expectedType.isAssignableFrom(obj.getClass())) {
+ return (T) obj;
+ }
+ Map, Function
+
+ io.helidon.messaging
+ helidon-messaging-jms-shim
+ jakarta.jmsjakarta.jms-api
diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java
index 1d25e1bddb7..30081660b78 100644
--- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java
+++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,10 +23,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.messaging.MessagingException;
+import io.helidon.messaging.NackHandler;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
@@ -40,11 +42,12 @@ abstract class AbstractJmsMessage implements JmsMessage {
private Executor executor;
private SessionMetadata sharedSessionEntry;
private volatile boolean acked = false;
+ private final NackHandler nackHandler;
- protected AbstractJmsMessage() {
- }
-
- protected AbstractJmsMessage(Executor executor, SessionMetadata sharedSessionEntry) {
+ protected AbstractJmsMessage(NackHandler nackHandler,
+ Executor executor,
+ SessionMetadata sharedSessionEntry) {
+ this.nackHandler = nackHandler;
this.sharedSessionEntry = sharedSessionEntry;
this.executor = executor;
}
@@ -116,4 +119,8 @@ public CompletionStage ack() {
});
}
+ @Override
+ public Function> getNack() {
+ return this.nackHandler != null ? this.nackHandler.getNack(this) : reason -> CompletableFuture.completedFuture(null);
+ }
}
diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java
index 328322b65dc..cb62c9e1349 100644
--- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java
+++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
import io.helidon.config.Config;
import io.helidon.messaging.MessagingException;
+import io.helidon.messaging.connectors.jms.shim.JakartaJms;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
@@ -87,11 +88,13 @@ Optional extends Destination> lookupDestination() {
}
Optional extends ConnectionFactory> lookupFactory(String jndi) {
- return Optional.ofNullable((ConnectionFactory) lookup(jndi));
+ return Optional.ofNullable(lookup(jndi))
+ .map(o -> JakartaJms.resolve(o, ConnectionFactory.class));
}
Optional extends Destination> lookupDestination(String jndi) {
- return Optional.ofNullable((Destination) lookup(jndi));
+ return Optional.ofNullable((Destination) lookup(jndi))
+ .map(o -> JakartaJms.resolve(o, Destination.class));
}
diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java
index df2c4a54a24..2b73c492ab2 100644
--- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java
+++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import io.helidon.messaging.MessagingException;
+import io.helidon.messaging.NackHandler;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
@@ -33,8 +34,11 @@ public class JmsBytesMessage extends AbstractJmsMessage {
private final jakarta.jms.BytesMessage msg;
- JmsBytesMessage(jakarta.jms.BytesMessage msg, Executor executor, SessionMetadata sharedSessionEntry) {
- super(executor, sharedSessionEntry);
+ JmsBytesMessage(NackHandler nackHandler,
+ jakarta.jms.BytesMessage msg,
+ Executor executor,
+ SessionMetadata sharedSessionEntry) {
+ super(nackHandler, executor, sharedSessionEntry);
this.msg = msg;
}
diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java
index d8e40459cd2..04f2069af60 100644
--- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java
+++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java
@@ -43,7 +43,9 @@
import io.helidon.config.ConfigValue;
import io.helidon.config.mp.MpConfig;
import io.helidon.messaging.MessagingException;
+import io.helidon.messaging.NackHandler;
import io.helidon.messaging.Stoppable;
+import io.helidon.messaging.connectors.jms.shim.JakartaWrapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
@@ -64,6 +66,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
@@ -76,6 +79,102 @@
*/
@ApplicationScoped
@Connector(JmsConnector.CONNECTOR_NAME)
+@ConnectorAttribute(name = JmsConnector.USERNAME_ATTRIBUTE,
+ description = "User name used to connect JMS session",
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.PASSWORD_ATTRIBUTE,
+ description = "Password to connect JMS session",
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.TYPE_ATTRIBUTE,
+ description = "Possible values are: queue, topic",
+ defaultValue = "queue",
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.DESTINATION_ATTRIBUTE,
+ description = "Queue or topic name",
+ mandatory = true,
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.ACK_MODE_ATTRIBUTE,
+ description = "Possible values are: "
+ + "AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, "
+ + "CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, "
+ + "DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.",
+ defaultValue = "AUTO_ACKNOWLEDGE",
+ direction = ConnectorAttribute.Direction.INCOMING,
+ type = "io.helidon.messaging.connectors.jms.AcknowledgeMode")
+@ConnectorAttribute(name = JmsConnector.TRANSACTED_ATTRIBUTE,
+ description = "Indicates whether the session will use a local transaction.",
+ mandatory = false,
+ defaultValue = "false",
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "boolean")
+@ConnectorAttribute(name = JmsConnector.MESSAGE_SELECTOR_ATTRIBUTE,
+ description = "JMS API message selector expression based on a subset of the SQL92. "
+ + "Expression can only access headers and properties, not the payload.",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.CLIENT_ID_ATTRIBUTE,
+ description = "Client identifier for JMS connection.",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.DURABLE_ATTRIBUTE,
+ description = "True for creating durable consumer (only for topic).",
+ mandatory = false,
+ defaultValue = "false",
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "boolean")
+@ConnectorAttribute(name = JmsConnector.SUBSCRIBER_NAME_ATTRIBUTE,
+ description = "Subscriber name for durable consumer used to identify subscription.",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.NON_LOCAL_ATTRIBUTE,
+ description = "If true then any messages published to the topic using this session’s connection, "
+ + "or any other connection with the same client identifier, "
+ + "will not be added to the durable subscription.",
+ mandatory = false,
+ defaultValue = "false",
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "boolean")
+@ConnectorAttribute(name = JmsConnector.NAMED_FACTORY_ATTRIBUTE,
+ description = "Select in case factory is injected as a named bean or configured with name.",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.POLL_TIMEOUT_ATTRIBUTE,
+ description = "Timeout for polling for next message in every poll cycle in millis. Default value: 50",
+ mandatory = false,
+ defaultValue = "50",
+ direction = ConnectorAttribute.Direction.INCOMING,
+ type = "long")
+@ConnectorAttribute(name = JmsConnector.PERIOD_EXECUTIONS_ATTRIBUTE,
+ description = "Period for executing poll cycles in millis.",
+ mandatory = false,
+ defaultValue = "100",
+ direction = ConnectorAttribute.Direction.INCOMING,
+ type = "long")
+@ConnectorAttribute(name = JmsConnector.SESSION_GROUP_ID_ATTRIBUTE,
+ description = "When multiple channels share same session-group-id, "
+ + "they share same JMS session and same JDBC connection as well.",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.JNDI_ATTRIBUTE + "." + JmsConnector.JNDI_JMS_FACTORY_ATTRIBUTE,
+ description = "JNDI name of JMS factory.",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "string")
+@ConnectorAttribute(name = JmsConnector.JNDI_ATTRIBUTE + "." + JmsConnector.JNDI_PROPS_ATTRIBUTE,
+ description = "Environment properties used for creating initial context java.naming.factory.initial, "
+ + "java.naming.provider.url …",
+ mandatory = false,
+ direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
+ type = "properties")
public class JmsConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, Stoppable {
private static final Logger LOGGER = Logger.getLogger(JmsConnector.class.getName());
@@ -254,18 +353,22 @@ void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object event)
/**
* Create reactive messaging message from JMS message.
*
+ * @param nackHandler Not acknowledged handler
* @param message JMS message
* @param executor executor used for async execution of ack
* @param sessionMetadata JMS session metadata
* @return reactive messaging message extended with custom JMS features
*/
- protected JmsMessage> createMessage(jakarta.jms.Message message, Executor executor, SessionMetadata sessionMetadata) {
+ protected JmsMessage> createMessage(NackHandler nackHandler,
+ jakarta.jms.Message message,
+ Executor executor,
+ SessionMetadata sessionMetadata) {
if (message instanceof TextMessage) {
- return new JmsTextMessage((TextMessage) message, executor, sessionMetadata);
+ return new JmsTextMessage(nackHandler, (TextMessage) message, executor, sessionMetadata);
} else if (message instanceof BytesMessage) {
- return new JmsBytesMessage((BytesMessage) message, executor, sessionMetadata);
+ return new JmsBytesMessage(nackHandler, (BytesMessage) message, executor, sessionMetadata);
} else {
- return new AbstractJmsMessage(executor, sessionMetadata) {
+ return new AbstractJmsMessage(nackHandler, executor, sessionMetadata) {
@Override
public jakarta.jms.Message getJmsMessage() {
@@ -331,39 +434,38 @@ public PublisherBuilder extends Message>> getPublisherBuilder(Config mpConfi
SessionMetadata sessionEntry = prepareSession(config, factory);
Destination destination = createDestination(sessionEntry.session(), ctx);
- String messageSelector = config.get(MESSAGE_SELECTOR_ATTRIBUTE).asString().orElse(null);
- String subscriberName = config.get(SUBSCRIBER_NAME_ATTRIBUTE).asString().orElse(null);
- MessageConsumer consumer;
- if (config.get(DURABLE_ATTRIBUTE).asBoolean().orElse(false)) {
- if (!(destination instanceof Topic)) {
- throw new MessagingException("Can't create durable consumer. Only topic can be durable!");
- }
- consumer = sessionEntry.session().createDurableSubscriber(
- (Topic) destination,
- subscriberName,
- messageSelector,
- config.get(NON_LOCAL_ATTRIBUTE).asBoolean().orElse(false));
- } else {
- consumer = sessionEntry.session().createConsumer(destination, messageSelector);
- }
+ MessageConsumer consumer = createConsumer(config, destination, sessionEntry);
BufferedEmittingPublisher> emitter = BufferedEmittingPublisher.create();
+ JmsNackHandler nackHandler = JmsNackHandler.create(emitter, config, this);
Long pollTimeout = config.get(POLL_TIMEOUT_ATTRIBUTE)
.asLong()
.orElse(POLL_TIMEOUT_DEFAULT);
+ Long periodExecutions = config.get(PERIOD_EXECUTIONS_ATTRIBUTE)
+ .asLong()
+ .orElse(PERIOD_EXECUTIONS_DEFAULT);
+
AtomicReference> lastMessage = new AtomicReference<>();
scheduler.scheduleAtFixedRate(
- () -> produce(emitter, sessionEntry, consumer, ackMode, awaitAck, pollTimeout, lastMessage),
- 0,
- config.get(PERIOD_EXECUTIONS_ATTRIBUTE)
- .asLong()
- .orElse(PERIOD_EXECUTIONS_DEFAULT),
- TimeUnit.MILLISECONDS);
+ () -> {
+ if (!emitter.hasRequests()) {
+ return;
+ }
+ // When await-ack is true, no message is received until previous one is acked
+ if (ackMode != AcknowledgeMode.AUTO_ACKNOWLEDGE
+ && awaitAck
+ && lastMessage.get() != null
+ && !lastMessage.get().isAck()) {
+ return;
+ }
+ produce(emitter, sessionEntry, consumer, nackHandler, pollTimeout)
+ .ifPresent(lastMessage::set);
+ }, 0, periodExecutions, TimeUnit.MILLISECONDS);
sessionEntry.connection().start();
return ReactiveStreams.fromPublisher(FlowAdapters.toPublisher(Multi.create(emitter)));
} catch (JMSException e) {
@@ -384,9 +486,8 @@ public SubscriberBuilder extends Message>, Void> getSubscriberBuilder(Config
SessionMetadata sessionEntry = prepareSession(config, factory);
Session session = sessionEntry.session();
Destination destination = createDestination(session, ctx);
- MessageProducer producer = session.createProducer(destination);
- configureProducer(producer, ctx);
- AtomicReference mapper = new AtomicReference<>();
+ MessageProducer producer = createProducer(destination, ctx, sessionEntry);
+ AtomicReference mapper = new AtomicReference<>();
return ReactiveStreams.>builder()
.flatMapCompletionStage(m -> consume(m, session, mapper, producer, config))
.onError(t -> LOGGER.log(Level.SEVERE, t, () -> "Error intercepted from channel "
@@ -401,7 +502,15 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx)
io.helidon.config.Config config = ctx.config().get("producer");
if (!config.exists()) return;
- Class extends MessageProducer> clazz = producer.getClass();
+ final Object instance;
+ // Shim producer?
+ if (producer instanceof JakartaWrapper>) {
+ instance = ((JakartaWrapper>) producer).unwrap();
+ } else {
+ instance = producer;
+ }
+
+ Class> clazz = instance.getClass();
Map setterMethods = Arrays.stream(clazz.getDeclaredMethods())
.filter(m -> m.getParameterCount() == 1)
.collect(Collectors.toMap(m -> ConfigHelper.stripSet(m.getName()), Function.identity()));
@@ -417,7 +526,7 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx)
return;
}
try {
- m.invoke(producer, c.as(m.getParameterTypes()[0]).get());
+ m.invoke(instance, c.as(m.getParameterTypes()[0]).get());
} catch (Throwable e) {
LOGGER.log(Level.WARNING,
"Error when setting JMS producer property " + key
@@ -428,43 +537,31 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx)
});
}
- private void produce(
+ private Optional> produce(
BufferedEmittingPublisher> emitter,
SessionMetadata sessionEntry,
MessageConsumer consumer,
- AcknowledgeMode ackMode,
- Boolean awaitAck,
- Long pollTimeout,
- AtomicReference> lastMessage) {
-
- if (!emitter.hasRequests()) {
- return;
- }
- // When await-ack is true, no message is received until previous one is acked
- if (ackMode != AcknowledgeMode.AUTO_ACKNOWLEDGE
- && awaitAck
- && lastMessage.get() != null
- && !lastMessage.get().isAck()) {
- return;
- }
+ JmsNackHandler nackHandler,
+ Long pollTimeout) {
try {
jakarta.jms.Message message = consumer.receive(pollTimeout);
if (message == null) {
- return;
+ return Optional.empty();
}
LOGGER.fine(() -> "Received message: " + message);
- JmsMessage> preparedMessage = createMessage(message, executor, sessionEntry);
- lastMessage.set(preparedMessage);
+ JmsMessage> preparedMessage = createMessage(nackHandler, message, executor, sessionEntry);
emitter.emit(preparedMessage);
+ return Optional.of(preparedMessage);
} catch (Throwable e) {
emitter.fail(e);
+ return Optional.empty();
}
}
- private CompletionStage> consume(
+ CompletionStage> consume(
Message> m,
Session session,
- AtomicReference mapper,
+ AtomicReference mapper,
MessageProducer producer,
io.helidon.config.Config config) {
@@ -474,28 +571,34 @@ private CompletionStage> consume(
}
return CompletableFuture
- .supplyAsync(() -> {
- try {
- jakarta.jms.Message jmsMessage;
-
- if (m instanceof OutgoingJmsMessage) {
- // custom mapping, properties etc.
- jmsMessage = ((OutgoingJmsMessage>) m).toJmsMessage(session, mapper.get());
- } else {
- // default mappers
- jmsMessage = mapper.get().apply(session, m);
- }
- // actual send
- producer.send(jmsMessage);
- return m.ack();
- } catch (JMSException e) {
- sendingErrorHandler(config).accept(m, e);
- }
- return CompletableFuture.completedFuture(null);
- }, executor)
+ .supplyAsync(() -> consumeAsync(m, session, mapper, producer, config), executor)
.thenApply(aVoid -> m);
}
+ protected CompletionStage> consumeAsync(Message> m,
+ Session session,
+ AtomicReference mapper,
+ MessageProducer producer,
+ io.helidon.config.Config config) {
+ try {
+ jakarta.jms.Message jmsMessage;
+
+ if (m instanceof OutgoingJmsMessage) {
+ // custom mapping, properties etc.
+ jmsMessage = ((OutgoingJmsMessage>) m).toJmsMessage(session, mapper.get());
+ } else {
+ // default mappers
+ jmsMessage = mapper.get().apply(session, m);
+ }
+ // actual send
+ producer.send(jmsMessage);
+ return m.ack();
+ } catch (JMSException e) {
+ sendingErrorHandler(config).accept(m, e);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Customizable handler for errors during sending.
*
@@ -504,12 +607,13 @@ private CompletionStage> consume(
*/
protected BiConsumer, JMSException> sendingErrorHandler(io.helidon.config.Config config) {
return (m, e) -> {
+ m.nack(e);
throw new MessagingException("Error during sending JMS message.", e);
};
}
- private SessionMetadata prepareSession(io.helidon.config.Config config,
- ConnectionFactory factory) throws JMSException {
+ protected SessionMetadata prepareSession(io.helidon.config.Config config,
+ ConnectionFactory factory) throws JMSException {
Optional sessionGroupId = config.get(SESSION_GROUP_ID_ATTRIBUTE).asString().asOptional();
if (sessionGroupId.isPresent() && sessionRegister.containsKey(sessionGroupId.get())) {
return sessionRegister.get(sessionGroupId.get());
@@ -543,9 +647,10 @@ private SessionMetadata prepareSession(io.helidon.config.Config config,
sessionRegister.put(sessionGroupId.orElseGet(() -> UUID.randomUUID().toString()), sharedSessionEntry);
return sharedSessionEntry;
}
+
}
- Destination createDestination(Session session, ConnectionContext ctx) {
+ protected Destination createDestination(Session session, ConnectionContext ctx) {
io.helidon.config.Config config = ctx.config();
if (ctx.isJndi()) {
@@ -582,6 +687,34 @@ Destination createDestination(Session session, ConnectionContext ctx) {
}
+ protected MessageConsumer createConsumer(io.helidon.config.Config config,
+ Destination destination,
+ SessionMetadata sessionEntry) throws JMSException {
+ String messageSelector = config.get(MESSAGE_SELECTOR_ATTRIBUTE).asString().orElse(null);
+ String subscriberName = config.get(SUBSCRIBER_NAME_ATTRIBUTE).asString().orElse(null);
+
+ if (config.get(DURABLE_ATTRIBUTE).asBoolean().orElse(false)) {
+ if (!(destination instanceof Topic)) {
+ throw new MessagingException("Can't create durable consumer. Only topic can be durable!");
+ }
+ return sessionEntry.session().createDurableSubscriber(
+ (Topic) destination,
+ subscriberName,
+ messageSelector,
+ config.get(NON_LOCAL_ATTRIBUTE).asBoolean().orElse(false));
+ } else {
+ return sessionEntry.session().createConsumer(destination, messageSelector);
+ }
+ }
+
+ protected MessageProducer createProducer(Destination destination,
+ ConnectionContext ctx,
+ SessionMetadata sessionEntry) throws JMSException {
+ MessageProducer producer = sessionEntry.session().createProducer(destination);
+ configureProducer(producer, ctx);
+ return producer;
+ }
+
/**
* Builder for {@link io.helidon.messaging.connectors.jms.JmsConnector}.
*/
diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsNackHandler.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsNackHandler.java
new file mode 100644
index 00000000000..2c6214649c7
--- /dev/null
+++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsNackHandler.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2022 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.messaging.connectors.jms;
+
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import io.helidon.common.reactive.BufferedEmittingPublisher;
+import io.helidon.config.Config;
+import io.helidon.config.ConfigSources;
+import io.helidon.messaging.MessagingException;
+import io.helidon.messaging.NackHandler;
+
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageProducer;
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import static java.lang.System.Logger.Level.ERROR;
+import static java.lang.System.Logger.Level.WARNING;
+
+abstract class JmsNackHandler implements NackHandler> {
+
+ static JmsNackHandler create(BufferedEmittingPublisher> emitter,
+ Config config,
+ JmsConnector jmsConnector) {
+ Config dlq = config.get("nack-dlq");
+ Config logOnly = config.get("nack-log-only");
+ if (dlq.exists()) {
+ dlq = dlq.detach();
+ return new JmsDLQ(config, dlq, jmsConnector);
+ } else if (logOnly.exists() && logOnly.asBoolean().orElse(true)) {
+ logOnly = logOnly.detach();
+ return new JmsNackHandler.Log(config, logOnly);
+ }
+ // Default nack handling strategy
+ return new JmsNackHandler.KillChannel(emitter, config);
+ }
+
+ static class Log extends JmsNackHandler {
+
+ private final System.Logger logger;
+ private final String channelName;
+ private final System.Logger.Level level;
+
+ Log(Config config, Config logOnlyConfig) {
+ this.channelName = config.get(JmsConnector.CHANNEL_NAME_ATTRIBUTE)
+ .asString()
+ .orElseThrow(() -> new MessagingException("Missing channel name!"));
+
+ this.level = logOnlyConfig.get("level")
+ .as(System.Logger.Level.class)
+ .orElse(WARNING);
+
+ this.logger = System.getLogger(logOnlyConfig.get("logger")
+ .asString()
+ .orElse(JmsNackHandler.class.getName()));
+ }
+
+
+ @Override
+ public Function> getNack(JmsMessage> message) {
+ return t -> nack(t, message);
+ }
+
+ private CompletionStage nack(Throwable t, JmsMessage> message) {
+ logger.log(level, messageToString("NACKED Message ignored", channelName, message));
+ message.ack();
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ static class KillChannel extends JmsNackHandler {
+
+ private static final System.Logger LOGGER = System.getLogger(JmsNackHandler.KillChannel.class.getName());
+ private final BufferedEmittingPublisher> emitter;
+ private final String channelName;
+
+ KillChannel(BufferedEmittingPublisher> emitter, Config config) {
+ this.emitter = emitter;
+ this.channelName = config.get(JmsConnector.CHANNEL_NAME_ATTRIBUTE)
+ .asString()
+ .orElseThrow(() -> new MessagingException("Missing channel name!"));
+ }
+
+ @Override
+ public Function> getNack(JmsMessage> message) {
+ return throwable -> nack(throwable, message);
+ }
+
+ private CompletionStage nack(Throwable t, JmsMessage> message) {
+ LOGGER.log(ERROR, messageToString("NACKED message, killing the channel", channelName, message), t);
+ emitter.fail(t);
+ return CompletableFuture.failedStage(t);
+ }
+ }
+
+ static String messageToString(String prefix, String channel, JmsMessage message) {
+ StringBuilder msg = new StringBuilder(prefix);
+ msg.append("\n");
+ appendNonNull(msg, "channel", channel);
+ appendNonNull(msg, "correlationId", message.getCorrelationId());
+ appendNonNull(msg, "replyTo", message.getReplyTo());
+ for (String prop : message.getPropertyNames()) {
+ appendNonNull(msg, prop, message.getProperty(prop));
+ }
+ return msg.toString();
+ }
+
+ static StringBuilder appendNonNull(StringBuilder sb, String name, Object value) {
+ if (Objects.isNull(value)) return sb;
+ return sb.append(name + ": ").append(value).append("\n");
+ }
+
+ static class JmsDLQ extends JmsNackHandler {
+ private static final System.Logger LOGGER = System.getLogger(JmsNackHandler.JmsDLQ.class.getName());
+ private final MessageProducer producer;
+ private final SessionMetadata sessionMetadata;
+ private final AtomicReference mapper = new AtomicReference<>();
+ private final String channelName;
+ private Config config;
+ private JmsConnector jmsConnector;
+ private Config dlq;
+
+ JmsDLQ(Config config, Config dlq, JmsConnector jmsConnector) {
+ this.config = config;
+ this.jmsConnector = jmsConnector;
+ this.channelName = config.get(JmsConnector.CHANNEL_NAME_ATTRIBUTE)
+ .asString()
+ .orElseThrow(() -> new MessagingException("Missing channel name!"));
+
+ Config.Builder dlqCfgBuilder = Config.builder();
+ HashMap dlqCfgMap = new HashMap<>();
+ if (dlq.isLeaf()) {
+ // nack-dlq=destination_name - Uses actual connection config, just set dlq destination
+ String destination = dlq.asString().orElseThrow(() -> new MessagingException("nack-dlq with no value!"));
+ dlqCfgMap.put(JmsConnector.DESTINATION_ATTRIBUTE, destination);
+ dlqCfgMap.put("type", "queue"); // default is queue
+ this.dlq = dlqCfgBuilder
+ .sources(
+ ConfigSources.create(dlqCfgMap),
+ ConfigSources.create(config.detach())
+ )
+ .disableEnvironmentVariablesSource()
+ .disableSystemPropertiesSource()
+ .build();
+ } else {
+ // Custom dlq connection config
+ this.dlq = dlq.detach();
+ }
+
+ try {
+ ConnectionContext ctx = new ConnectionContext(this.dlq);
+ ConnectionFactory factory = jmsConnector.getFactory(ctx)
+ .orElseThrow(() -> new MessagingException("No ConnectionFactory found."));
+ sessionMetadata = jmsConnector.prepareSession(dlq, factory);
+ Destination destination = jmsConnector.createDestination(sessionMetadata.session(), ctx);
+ producer = jmsConnector.createProducer(destination, ctx, sessionMetadata);
+ } catch (JMSException e) {
+ throw new MessagingException("Error when setting up DLQ nack handler for channel " + channelName, e);
+ }
+ }
+
+ @Override
+ public Function> getNack(JmsMessage> message) {
+ return throwable -> nack(throwable, message);
+ }
+
+ private CompletionStage nack(Throwable t, JmsMessage> message) {
+ try {
+
+ Throwable cause = t;
+ while (cause.getCause() != null && cause != cause.getCause()) {
+ cause = cause.getCause();
+ }
+
+ // It has to be incoming JMS message as this nack handler cannot be used outside of connector
+ JmsMessage.OutgoingJmsMessageBuilder