From 2066923320de9fc8144597eccf45f6daeba19b78 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 08:16:14 +0100 Subject: [PATCH 01/13] add new message publisher moduler --- .../rabbitmq-message-publisher/build.gradle | 21 ++ .../dependencies.lock | 261 ++++++++++++++++++ settings.gradle | 6 +- workflow-event-listener/build.gradle | 1 + 4 files changed, 287 insertions(+), 2 deletions(-) create mode 100644 message-publisher/rabbitmq-message-publisher/build.gradle create mode 100644 message-publisher/rabbitmq-message-publisher/dependencies.lock diff --git a/message-publisher/rabbitmq-message-publisher/build.gradle b/message-publisher/rabbitmq-message-publisher/build.gradle new file mode 100644 index 00000000..c9824507 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/build.gradle @@ -0,0 +1,21 @@ +plugins { + id 'groovy' +} +dependencies { + + implementation "com.netflix.conductor:conductor-common:${revConductor}" + implementation "com.netflix.conductor:conductor-core:${revConductor}" + implementation "com.netflix.conductor:conductor-amqp:${revConductor}" + + compileOnly 'org.springframework.boot:spring-boot-starter' + compileOnly 'org.springframework.boot:spring-boot-starter-web' + + testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}" + testImplementation "org.spockframework:spock-core:${revSpock}" + testImplementation "org.spockframework:spock-spring:${revSpock}" + + + testImplementation "com.netflix.conductor:conductor-server:${revConductor}" + testImplementation 'org.springframework.boot:spring-boot-starter-web' + testImplementation project(':conductor-test-util').sourceSets.test.output +} \ No newline at end of file diff --git a/message-publisher/rabbitmq-message-publisher/dependencies.lock b/message-publisher/rabbitmq-message-publisher/dependencies.lock new file mode 100644 index 00000000..df064988 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/dependencies.lock @@ -0,0 +1,261 @@ +{ + "annotationProcessor": { + "org.springframework.boot:spring-boot-configuration-processor": { + "locked": "2.7.16" + } + }, + "compileClasspath": { + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "locked": "3.15.0" + }, + "org.apache.logging.log4j:log4j-api": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "locked": "2.17.2" + }, + "org.springframework.boot:spring-boot-starter": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "2.7.16" + } + }, + "runtimeClasspath": { + "com.google.guava:guava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "32.1.2-jre" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.rabbitmq:amqp-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "5.14.3" + }, + "io.reactivex:rxjava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "1.3.8" + }, + "org.apache.commons:commons-lang3": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + } + }, + "testCompileClasspath": { + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-server": { + "locked": "3.15.0" + }, + "junit:junit": { + "locked": "4.13.2" + }, + "org.apache.logging.log4j:log4j-api": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "locked": "2.17.2" + }, + "org.codehaus.groovy:groovy-all": { + "locked": "3.0.19" + }, + "org.junit.vintage:junit-vintage-engine": { + "locked": "5.8.2" + }, + "org.spockframework:spock-core": { + "locked": "2.3-groovy-3.0" + }, + "org.spockframework:spock-spring": { + "locked": "2.3-groovy-3.0" + }, + "org.springframework.boot:spring-boot-starter-log4j2": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-test": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "2.7.16" + } + }, + "testRuntimeClasspath": { + "com.google.guava:guava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "32.1.2-jre" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-server": { + "locked": "3.15.0" + }, + "com.rabbitmq:amqp-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "5.14.3" + }, + "io.reactivex:rxjava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "1.3.8" + }, + "junit:junit": { + "locked": "4.13.2" + }, + "org.apache.commons:commons-lang3": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.codehaus.groovy:groovy-all": { + "locked": "3.0.19" + }, + "org.junit.vintage:junit-vintage-engine": { + "locked": "5.8.2" + }, + "org.spockframework:spock-core": { + "locked": "2.3-groovy-3.0" + }, + "org.spockframework:spock-spring": { + "locked": "2.3-groovy-3.0" + }, + "org.springframework.boot:spring-boot-starter-log4j2": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-test": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "2.7.16" + } + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 6ff0f24b..85d3d42d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ include 'index' include 'test-util' include 'lock' include 'workflow-event-listener' +include 'message-publisher' include 'persistence:common-persistence' include 'persistence:mysql-persistence' @@ -35,6 +36,7 @@ include 'external-payload-storage:postgres-external-storage' include 'event-queue:amqp' include 'event-queue:nats' include 'event-queue:nats-streaming' +include 'message-publisher:rabbitmq-message-publisher' include 'lock:zookeeper-lock' include 'task:kafka' include 'community-server' @@ -42,13 +44,13 @@ include 'community-server' rootProject.children.stream() .filter(p -> p.name.equals("persistence") || p.name.equals("index") || p.name.equals("external-payload-storage") || p.name.equals("event-queue") || - p.name.equals("lock") || p.name.equals("task")) + p.name.equals("lock") || p.name.equals("task") || p.name.equals("message-publisher")) .forEach(it -> it.children .forEach(c -> c.name = "conductor-" + c.name)) rootProject.children.stream() .filter(p -> !p.name.equals("persistence") && !p.name.equals("index") && !p.name.equals("external-payload-storage") && !p.name.equals("event-queue") && - !p.name.equals("lock") && !p.name.equals("task")) + !p.name.equals("lock") && !p.name.equals("task") && !p.name.equals("message-publisher")) .forEach(it -> it.name = "conductor-" + it.name) diff --git a/workflow-event-listener/build.gradle b/workflow-event-listener/build.gradle index 3b9b802b..49e0e29c 100644 --- a/workflow-event-listener/build.gradle +++ b/workflow-event-listener/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation "com.netflix.conductor:conductor-common:${revConductor}" implementation "com.netflix.conductor:conductor-core:${revConductor}" + compileOnly 'org.springframework.boot:spring-boot-starter' compileOnly 'org.springframework.boot:spring-boot-starter-web' From dfd9043b7dba515929e9337b37a8172f77ba704c Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 08:27:46 +0100 Subject: [PATCH 02/13] add rabbit mq properties class --- .../rabbitmq-message-publisher/build.gradle | 2 + .../rabbitmq/config/RabbitMQProperties.java | 127 ++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java diff --git a/message-publisher/rabbitmq-message-publisher/build.gradle b/message-publisher/rabbitmq-message-publisher/build.gradle index c9824507..48442cc8 100644 --- a/message-publisher/rabbitmq-message-publisher/build.gradle +++ b/message-publisher/rabbitmq-message-publisher/build.gradle @@ -7,6 +7,8 @@ dependencies { implementation "com.netflix.conductor:conductor-core:${revConductor}" implementation "com.netflix.conductor:conductor-amqp:${revConductor}" + implementation "com.rabbitmq:amqp-client:${revAmqpClient}" + compileOnly 'org.springframework.boot:spring-boot-starter' compileOnly 'org.springframework.boot:spring-boot-starter-web' diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java new file mode 100644 index 00000000..4f1c3ceb --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java @@ -0,0 +1,127 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.config; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import com.rabbitmq.client.ConnectionFactory; + +@ConfigurationProperties("conductor.message-publisher.rabbitmq") +public class RabbitMQProperties { + + private String hosts = ConnectionFactory.DEFAULT_HOST; + private String username = ConnectionFactory.DEFAULT_HOST; + private String password = ConnectionFactory.DEFAULT_PASS; + + private int port = ConnectionFactory.DEFAULT_AMQP_PORT; + private int maxChannelCount = 5000; + private int limit = 50; + private int duration = 1000; + + private String virtualHost = ConnectionFactory.DEFAULT_VHOST; + + private String allowedTaskStatuses; + + private String workflowStatusExchange; + private String taskStatusExchange; + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getMaxChannelCount() { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) { + this.maxChannelCount = maxChannelCount; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public int getDuration() { + return duration; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public String getVirtualHost() { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public String getWorkflowStatusExchange() { + return workflowStatusExchange; + } + + public void setWorkflowStatusExchange(String workflowStatusExchange) { + this.workflowStatusExchange = workflowStatusExchange; + } + + public String getTaskStatusExchange() { + return taskStatusExchange; + } + + public void setTaskStatusExchange(String taskStatusExchange) { + this.taskStatusExchange = taskStatusExchange; + } + + public List getAllowedTaskStatuses() { + return Arrays.asList(this.allowedTaskStatuses.split(",")); + } + + public void setAllowedTaskStatuses(String allowedTaskStatuses) { + this.allowedTaskStatuses = allowedTaskStatuses; + } +} From 6e2b4a465b1bfb330ce767aaf4e08b4d8ba5aa7b Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 09:10:24 +0100 Subject: [PATCH 03/13] add configuration class and register amqp connection class as bean --- .../config/RabbitMQConfiguration.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java new file mode 100644 index 00000000..ab41ac9c --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -0,0 +1,48 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; + +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; + +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties(RabbitMQProperties.class) +@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "rabbitmq") +public class RabbitMQConfiguration { + + @Bean + public AMQPConnection amqpConnection(RabbitMQProperties rabbitMQProperties) { + ConnectionFactory connectionFactory = new ConnectionFactory(); + + connectionFactory.setHost(rabbitMQProperties.getHosts()); + connectionFactory.setPort(rabbitMQProperties.getPort()); + connectionFactory.setUsername(rabbitMQProperties.getUsername()); + connectionFactory.setPassword(rabbitMQProperties.getPassword()); + + Address[] addresses = + new Address[] { + new Address(rabbitMQProperties.getHosts(), rabbitMQProperties.getPort()) + }; + + AMQPRetryPattern retryPattern = new AMQPRetryPattern(); + + return AMQPConnection.getInstance(connectionFactory, addresses, retryPattern); + } +} From 8a5ceaa25dee6b77210b3b7ea255c386b6467f92 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 09:46:12 +0100 Subject: [PATCH 04/13] rabbitmq service with implementation --- .../rabbitmq/services/RabbitMQService.java | 24 ++++++ .../services/RabbitMQServiceImpl.java | 83 +++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java create mode 100644 message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java new file mode 100644 index 00000000..ba76fabd --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java @@ -0,0 +1,24 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.services; + +import com.rabbitmq.client.BuiltinExchangeType; + +public interface RabbitMQService { + void publishMessage(String exchangeName, T content) throws Exception; + + void publishMessage( + String exchangeName, BuiltinExchangeType exchangeType, String routingKey, T content) + throws Exception; + + void close(); +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java new file mode 100644 index 00000000..27a4bc42 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java @@ -0,0 +1,83 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; +import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; + +public class RabbitMQServiceImpl implements RabbitMQService { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQServiceImpl.class); + + private final AMQPConnection amqpConnection; + + @Autowired private final ObjectMapper objectMapper; + + public RabbitMQServiceImpl(AMQPConnection amqpConnection, ObjectMapper objectMapper) { + this.amqpConnection = amqpConnection; + this.objectMapper = objectMapper; + } + + @Override + public void publishMessage(String exchangeName, T content) throws Exception { + Channel channel = amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, exchangeName); + + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false); + + String jsonMessage = serializeContent(content); + + channel.basicPublish(exchangeName, "", null, jsonMessage.getBytes()); + + amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel); + } + + @Override + public void publishMessage( + String exchangeName, BuiltinExchangeType exchangeType, String routingKey, T content) + throws Exception { + Channel channel = amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, exchangeName); + + channel.exchangeDeclare(exchangeName, exchangeType, false); + + String jsonMessage = serializeContent(content); + + channel.basicPublish(exchangeName, routingKey, null, jsonMessage.getBytes()); + + amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel); + } + + @Override + public void close() { + amqpConnection.close(); + } + + private String serializeContent(T content) { + try { + return objectMapper.writeValueAsString(content); + } catch (JsonProcessingException e) { + LOGGER.error( + "Failed to serialize message of type: {} to String. Exception: {}", + content.getClass(), + e); + throw new RuntimeException(e); + } + } +} From 21303db61a2b5f1d19dcc4effa4c7c9e952e38fe Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 09:48:15 +0100 Subject: [PATCH 05/13] register rabbitmq service --- .../conductor/rabbitmq/config/RabbitMQConfiguration.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index ab41ac9c..5fff6260 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -18,7 +18,10 @@ import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; +import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl; +import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Address; import com.rabbitmq.client.ConnectionFactory; @@ -45,4 +48,10 @@ public AMQPConnection amqpConnection(RabbitMQProperties rabbitMQProperties) { return AMQPConnection.getInstance(connectionFactory, addresses, retryPattern); } + + @Bean + public RabbitMQService rabbitMQService( + AMQPConnection amqpConnection, ObjectMapper objectMapper) { + return new RabbitMQServiceImpl(amqpConnection, objectMapper); + } } From 9e03dfc4e4163ab980bd8e2f8e722f5ca793ea6e Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 09:52:10 +0100 Subject: [PATCH 06/13] wf status listener implementation --- .../WorkflowStatusListenerRabbitMQ.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java new file mode 100644 index 00000000..518732c8 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java @@ -0,0 +1,58 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; + +public class WorkflowStatusListenerRabbitMQ implements WorkflowStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusListenerRabbitMQ.class); + private final RabbitMQService rabbitMQService; + private final String EXCHANGE_NAME; + + public WorkflowStatusListenerRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + this.rabbitMQService = rabbitMQService; + this.EXCHANGE_NAME = rabbitMQProperties.getWorkflowStatusExchange(); + } + + @Override + public void onWorkflowCompleted(WorkflowModel workflow) { + publishMessage(workflow); + } + + @Override + public void onWorkflowTerminated(WorkflowModel workflow) { + publishMessage(workflow); + } + + @Override + public void onWorkflowFinalized(WorkflowModel workflow) { + publishMessage(workflow); + } + + private void publishMessage(WorkflowModel workflow) { + try { + rabbitMQService.publishMessage(EXCHANGE_NAME, workflow); + } catch (Exception e) { + LOGGER.error( + "Failed to publish message to exchange: {}. Exception: {}", EXCHANGE_NAME, e); + throw new RuntimeException(e); + } + } +} From e957ed4493ec481d66ded6b4f547d5602b8cf6a4 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 09:54:50 +0100 Subject: [PATCH 07/13] register wf status listener conditionally --- .../rabbitmq/config/RabbitMQConfiguration.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index 5fff6260..37427e7b 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -18,6 +18,7 @@ import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.rabbitmq.listener.WorkflowStatusListenerRabbitMQ; import com.netflix.conductor.rabbitmq.services.RabbitMQService; import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl; @@ -54,4 +55,15 @@ public RabbitMQService rabbitMQService( AMQPConnection amqpConnection, ObjectMapper objectMapper) { return new RabbitMQServiceImpl(amqpConnection, objectMapper); } + + @ConditionalOnProperty( + prefix = "conductor.message-publisher.workflow-status", + name = "enabled", + havingValue = "true", + matchIfMissing = false) + @Bean + public WorkflowStatusListenerRabbitMQ workflowStatusListenerRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + return new WorkflowStatusListenerRabbitMQ(rabbitMQService, rabbitMQProperties); + } } From 4590f39035d6e8cb56706c0434c92323b56f766e Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 10:03:24 +0100 Subject: [PATCH 08/13] add task status publihser as task status listener implementation --- .../config/RabbitMQConfiguration.java | 6 +- .../listener/TaskStatusPublisherRabbitMQ.java | 96 +++++++++++++++++++ ...a => WorkflowStatusPublisherRabbitMQ.java} | 6 +- 3 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java rename message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/{WorkflowStatusListenerRabbitMQ.java => WorkflowStatusPublisherRabbitMQ.java} (92%) diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index 37427e7b..ec5bf30f 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -18,7 +18,7 @@ import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; -import com.netflix.conductor.rabbitmq.listener.WorkflowStatusListenerRabbitMQ; +import com.netflix.conductor.rabbitmq.listener.WorkflowStatusPublisherRabbitMQ; import com.netflix.conductor.rabbitmq.services.RabbitMQService; import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl; @@ -62,8 +62,8 @@ public RabbitMQService rabbitMQService( havingValue = "true", matchIfMissing = false) @Bean - public WorkflowStatusListenerRabbitMQ workflowStatusListenerRabbitMQ( + public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ( RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { - return new WorkflowStatusListenerRabbitMQ(rabbitMQService, rabbitMQProperties); + return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); } } diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java new file mode 100644 index 00000000..5ab4c7f5 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java @@ -0,0 +1,96 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; + +public class TaskStatusPublisherRabbitMQ implements TaskStatusListener { + + private Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisherRabbitMQ.class); + + private final RabbitMQService rabbitMQService; + private final RabbitMQProperties rabbitMQProperties; + + public TaskStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + this.rabbitMQService = rabbitMQService; + this.rabbitMQProperties = rabbitMQProperties; + } + + @Override + public void onTaskScheduled(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskInProgress(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCanceled(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskFailed(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskFailedWithTerminalError(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCompleted(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCompletedWithErrors(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskTimedOut(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskSkipped(TaskModel task) { + publishMessage(task); + } + + private boolean IsStatusEnabled(TaskModel task) { + return rabbitMQProperties.getAllowedTaskStatuses().contains(task.getStatus().name()); + } + + private void publishMessage(TaskModel task) { + try { + if (IsStatusEnabled(task)) + rabbitMQService.publishMessage(rabbitMQProperties.getTaskStatusExchange(), task); + } catch (Exception e) { + LOGGER.error( + "Failed to publish message to exchange: {}. Exception: {}", + rabbitMQProperties.getTaskStatusExchange(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java similarity index 92% rename from message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java rename to message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java index 518732c8..80b6554a 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusListenerRabbitMQ.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java @@ -19,13 +19,13 @@ import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; import com.netflix.conductor.rabbitmq.services.RabbitMQService; -public class WorkflowStatusListenerRabbitMQ implements WorkflowStatusListener { +public class WorkflowStatusPublisherRabbitMQ implements WorkflowStatusListener { - private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusListenerRabbitMQ.class); + private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusPublisherRabbitMQ.class); private final RabbitMQService rabbitMQService; private final String EXCHANGE_NAME; - public WorkflowStatusListenerRabbitMQ( + public WorkflowStatusPublisherRabbitMQ( RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { this.rabbitMQService = rabbitMQService; this.EXCHANGE_NAME = rabbitMQProperties.getWorkflowStatusExchange(); From c9e002cde00634d671223c567d3c358c6dee5957 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 10:07:16 +0100 Subject: [PATCH 09/13] register task publisher conditionally --- .../rabbitmq/config/RabbitMQConfiguration.java | 12 ++++++++++++ .../listener/TaskStatusPublisherRabbitMQ.java | 2 +- workflow-event-listener/build.gradle | 1 - 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index ec5bf30f..d237b543 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -18,6 +18,7 @@ import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.rabbitmq.listener.TaskStatusPublisherRabbitMQ; import com.netflix.conductor.rabbitmq.listener.WorkflowStatusPublisherRabbitMQ; import com.netflix.conductor.rabbitmq.services.RabbitMQService; import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl; @@ -66,4 +67,15 @@ public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ( RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); } + + @ConditionalOnProperty( + prefix = "conductor.message-publisher.task-status", + name = "enabled", + havingValue = "true", + matchIfMissing = false) + @Bean + public TaskStatusPublisherRabbitMQ taskStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + return new TaskStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); + } } diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java index 5ab4c7f5..942ce582 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java @@ -21,7 +21,7 @@ public class TaskStatusPublisherRabbitMQ implements TaskStatusListener { - private Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisherRabbitMQ.class); + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisherRabbitMQ.class); private final RabbitMQService rabbitMQService; private final RabbitMQProperties rabbitMQProperties; diff --git a/workflow-event-listener/build.gradle b/workflow-event-listener/build.gradle index 49e0e29c..3b9b802b 100644 --- a/workflow-event-listener/build.gradle +++ b/workflow-event-listener/build.gradle @@ -6,7 +6,6 @@ dependencies { implementation "com.netflix.conductor:conductor-common:${revConductor}" implementation "com.netflix.conductor:conductor-core:${revConductor}" - compileOnly 'org.springframework.boot:spring-boot-starter' compileOnly 'org.springframework.boot:spring-boot-starter-web' From 6892da50eb63334fb79e2d2d98dd82829c6a7876 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 15:25:00 +0100 Subject: [PATCH 10/13] enable wf listener through configuration and change condition for registering bean --- .../config/RabbitMQConfiguration.java | 10 +++--- .../rabbitmq/config/RabbitMQProperties.java | 10 ++++++ .../WorkflowStatusPublisherRabbitMQ.java | 35 ++++++++++++++++--- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index d237b543..9f83b0fb 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -58,9 +58,8 @@ public RabbitMQService rabbitMQService( } @ConditionalOnProperty( - prefix = "conductor.message-publisher.workflow-status", - name = "enabled", - havingValue = "true", + name = "conductor.workflow-status-listener.type", + havingValue = "rabbitmq", matchIfMissing = false) @Bean public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ( @@ -69,9 +68,8 @@ public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ( } @ConditionalOnProperty( - prefix = "conductor.message-publisher.task-status", - name = "enabled", - havingValue = "true", + name = "conductor.task-status-listener.type", + havingValue = "rabbitmq", matchIfMissing = false) @Bean public TaskStatusPublisherRabbitMQ taskStatusPublisherRabbitMQ( diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java index 4f1c3ceb..1844fc5e 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java @@ -37,6 +37,8 @@ public class RabbitMQProperties { private String workflowStatusExchange; private String taskStatusExchange; + private boolean workflowStatusListenerEnabled = true; + public String getHosts() { return hosts; } @@ -124,4 +126,12 @@ public List getAllowedTaskStatuses() { public void setAllowedTaskStatuses(String allowedTaskStatuses) { this.allowedTaskStatuses = allowedTaskStatuses; } + + public boolean isWorkflowStatusListenerEnabled() { + return workflowStatusListenerEnabled; + } + + public void setWorkflowStatusListenerEnabled(boolean workflowStatusListenerEnabled) { + this.workflowStatusListenerEnabled = workflowStatusListenerEnabled; + } } diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java index 80b6554a..d60f1a10 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java @@ -23,12 +23,36 @@ public class WorkflowStatusPublisherRabbitMQ implements WorkflowStatusListener { private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusPublisherRabbitMQ.class); private final RabbitMQService rabbitMQService; - private final String EXCHANGE_NAME; + private final RabbitMQProperties rabbitMQProperties; public WorkflowStatusPublisherRabbitMQ( RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { this.rabbitMQService = rabbitMQService; - this.EXCHANGE_NAME = rabbitMQProperties.getWorkflowStatusExchange(); + this.rabbitMQProperties = rabbitMQProperties; + } + + @Override + public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isWorkflowStatusListenerEnabled()) { + onWorkflowCompleted(workflow); + } + } + + @Override + public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isWorkflowStatusListenerEnabled()) { + onWorkflowTerminated(workflow); + } + } + + @Override + public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isWorkflowStatusListenerEnabled()) { + onWorkflowFinalized(workflow); + } } @Override @@ -48,10 +72,13 @@ public void onWorkflowFinalized(WorkflowModel workflow) { private void publishMessage(WorkflowModel workflow) { try { - rabbitMQService.publishMessage(EXCHANGE_NAME, workflow); + rabbitMQService.publishMessage( + rabbitMQProperties.getWorkflowStatusExchange(), workflow); } catch (Exception e) { LOGGER.error( - "Failed to publish message to exchange: {}. Exception: {}", EXCHANGE_NAME, e); + "Failed to publish message to exchange: {}. Exception: {}", + rabbitMQProperties.getWorkflowStatusExchange(), + e); throw new RuntimeException(e); } } From c865e37dc7e2356ea0b0b9539c3cca0951b3a719 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 15:34:13 +0100 Subject: [PATCH 11/13] include module rabbitmq publisher and add default appliction properties --- community-server/build.gradle | 1 + .../src/main/resources/application.properties | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/community-server/build.gradle b/community-server/build.gradle index 52812fc9..af72ec9c 100644 --- a/community-server/build.gradle +++ b/community-server/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation project(':task:conductor-kafka') implementation project(':conductor-workflow-event-listener') + implementation project(':message-publisher:conductor-rabbitmq-message-publisher') implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-validation' diff --git a/community-server/src/main/resources/application.properties b/community-server/src/main/resources/application.properties index ee0f00f0..51721f0b 100644 --- a/community-server/src/main/resources/application.properties +++ b/community-server/src/main/resources/application.properties @@ -127,3 +127,20 @@ management.metrics.export.datadog.enabled=false #optional - default Datadog instance is https://app.datadoghq.com/ # management.metrics.export.datadog.uri= #management.metrics.export.datadog.step=10s + +#Following properties set for using for publishing workflow/task status change event to RabbitMQ: +#(To enable support of RabbitMQ publishers) +#conductor.message-publisher.type=rabbitmq + +#conductor.workflow-status-listener.type=rabbitmq +#conductor.task-status-listener.type=rabbitmq +#conductor.message-publisher.rabbitmq.hosts= +#conductor.message-publisher.rabbitmq.username= +#conductor.message-publisher.rabbitmq.password= +#conductor.message-publisher.rabbitmq.port=5672 +#conductor.message-publisher.rabbitmq.workflowStatusExchange=workflow.status.exchange +#conductor.message-publisher.rabbitmq.taskStatusExchange=task.status.exchange +#conductor.message-publisher.rabbitmq.workflowStatusListenerEnabled=true +#conductor.message-publisher.rabbitmq.allowedTaskStatuses=COMPLETED,IN_PROGRESS,FAILED +# Task statuses +# IN_PROGRESS,CANCELED,FAILED,FAILED_WITH_TERMINAL_ERROR,COMPLETED,COMPLETED_WITH_ERRORS,SCHEDULED,TIMED_OUT,SKIPPED From 1b537bbb01dd97b0f362d0f2c47e54cdf63300e2 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Wed, 27 Dec 2023 15:36:11 +0100 Subject: [PATCH 12/13] remove proxy bean methods --- .../conductor/rabbitmq/config/RabbitMQConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index 9f83b0fb..7b784196 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -27,7 +27,7 @@ import com.rabbitmq.client.Address; import com.rabbitmq.client.ConnectionFactory; -@Configuration(proxyBeanMethods = false) +@Configuration @EnableConfigurationProperties(RabbitMQProperties.class) @ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "rabbitmq") public class RabbitMQConfiguration { From fc53210a3e3872acfbd95891014647a6bbf66557 Mon Sep 17 00:00:00 2001 From: darkobuvac Date: Thu, 4 Jan 2024 09:15:10 +0100 Subject: [PATCH 13/13] rename property --- .../src/main/resources/application.properties | 5 +++-- .../rabbitmq/config/RabbitMQConfiguration.java | 6 ++++-- .../rabbitmq/config/RabbitMQProperties.java | 12 ++++++------ .../listener/WorkflowStatusPublisherRabbitMQ.java | 6 +++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/community-server/src/main/resources/application.properties b/community-server/src/main/resources/application.properties index 51721f0b..87c08172 100644 --- a/community-server/src/main/resources/application.properties +++ b/community-server/src/main/resources/application.properties @@ -128,8 +128,9 @@ management.metrics.export.datadog.enabled=false # management.metrics.export.datadog.uri= #management.metrics.export.datadog.step=10s -#Following properties set for using for publishing workflow/task status change event to RabbitMQ: -#(To enable support of RabbitMQ publishers) +# Following properties set for using for publishing workflow/task status change event to RabbitMQ: +# (To enable support of RabbitMQ publishers) + #conductor.message-publisher.type=rabbitmq #conductor.workflow-status-listener.type=rabbitmq diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java index 7b784196..b2bd4899 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -11,6 +11,8 @@ */ package com.netflix.conductor.rabbitmq.config; +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.core.listener.WorkflowStatusListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -62,7 +64,7 @@ public RabbitMQService rabbitMQService( havingValue = "rabbitmq", matchIfMissing = false) @Bean - public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ( + public WorkflowStatusListener workflowStatusListenerRabbitMQ( RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); } @@ -72,7 +74,7 @@ public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ( havingValue = "rabbitmq", matchIfMissing = false) @Bean - public TaskStatusPublisherRabbitMQ taskStatusPublisherRabbitMQ( + public TaskStatusListener taskStatusPublisherRabbitMQ( RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { return new TaskStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); } diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java index 1844fc5e..ea88b812 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java @@ -22,7 +22,7 @@ public class RabbitMQProperties { private String hosts = ConnectionFactory.DEFAULT_HOST; - private String username = ConnectionFactory.DEFAULT_HOST; + private String username = ConnectionFactory.DEFAULT_USER; private String password = ConnectionFactory.DEFAULT_PASS; private int port = ConnectionFactory.DEFAULT_AMQP_PORT; @@ -37,7 +37,7 @@ public class RabbitMQProperties { private String workflowStatusExchange; private String taskStatusExchange; - private boolean workflowStatusListenerEnabled = true; + private boolean alwaysPublishWorkflowStatusEnabled = true; public String getHosts() { return hosts; @@ -127,11 +127,11 @@ public void setAllowedTaskStatuses(String allowedTaskStatuses) { this.allowedTaskStatuses = allowedTaskStatuses; } - public boolean isWorkflowStatusListenerEnabled() { - return workflowStatusListenerEnabled; + public boolean isAlwaysPublishWorkflowStatusEnabled() { + return alwaysPublishWorkflowStatusEnabled; } - public void setWorkflowStatusListenerEnabled(boolean workflowStatusListenerEnabled) { - this.workflowStatusListenerEnabled = workflowStatusListenerEnabled; + public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) { + this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled; } } diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java index d60f1a10..f3210bf0 100644 --- a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java @@ -34,7 +34,7 @@ public WorkflowStatusPublisherRabbitMQ( @Override public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() - || rabbitMQProperties.isWorkflowStatusListenerEnabled()) { + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { onWorkflowCompleted(workflow); } } @@ -42,7 +42,7 @@ public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { @Override public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() - || rabbitMQProperties.isWorkflowStatusListenerEnabled()) { + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { onWorkflowTerminated(workflow); } } @@ -50,7 +50,7 @@ public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { @Override public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) { if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() - || rabbitMQProperties.isWorkflowStatusListenerEnabled()) { + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { onWorkflowFinalized(workflow); } }