diff --git a/community-server/build.gradle b/community-server/build.gradle index 52812fc9..af72ec9c 100644 --- a/community-server/build.gradle +++ b/community-server/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation project(':task:conductor-kafka') implementation project(':conductor-workflow-event-listener') + implementation project(':message-publisher:conductor-rabbitmq-message-publisher') implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-validation' diff --git a/community-server/src/main/resources/application.properties b/community-server/src/main/resources/application.properties index ee0f00f0..87c08172 100644 --- a/community-server/src/main/resources/application.properties +++ b/community-server/src/main/resources/application.properties @@ -127,3 +127,21 @@ management.metrics.export.datadog.enabled=false #optional - default Datadog instance is https://app.datadoghq.com/ # management.metrics.export.datadog.uri= #management.metrics.export.datadog.step=10s + +# Following properties set for using for publishing workflow/task status change event to RabbitMQ: +# (To enable support of RabbitMQ publishers) + +#conductor.message-publisher.type=rabbitmq + +#conductor.workflow-status-listener.type=rabbitmq +#conductor.task-status-listener.type=rabbitmq +#conductor.message-publisher.rabbitmq.hosts= +#conductor.message-publisher.rabbitmq.username= +#conductor.message-publisher.rabbitmq.password= +#conductor.message-publisher.rabbitmq.port=5672 +#conductor.message-publisher.rabbitmq.workflowStatusExchange=workflow.status.exchange +#conductor.message-publisher.rabbitmq.taskStatusExchange=task.status.exchange +#conductor.message-publisher.rabbitmq.workflowStatusListenerEnabled=true +#conductor.message-publisher.rabbitmq.allowedTaskStatuses=COMPLETED,IN_PROGRESS,FAILED +# Task statuses +# IN_PROGRESS,CANCELED,FAILED,FAILED_WITH_TERMINAL_ERROR,COMPLETED,COMPLETED_WITH_ERRORS,SCHEDULED,TIMED_OUT,SKIPPED diff --git a/message-publisher/rabbitmq-message-publisher/build.gradle b/message-publisher/rabbitmq-message-publisher/build.gradle new file mode 100644 index 00000000..48442cc8 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/build.gradle @@ -0,0 +1,23 @@ +plugins { + id 'groovy' +} +dependencies { + + implementation "com.netflix.conductor:conductor-common:${revConductor}" + implementation "com.netflix.conductor:conductor-core:${revConductor}" + implementation "com.netflix.conductor:conductor-amqp:${revConductor}" + + implementation "com.rabbitmq:amqp-client:${revAmqpClient}" + + compileOnly 'org.springframework.boot:spring-boot-starter' + compileOnly 'org.springframework.boot:spring-boot-starter-web' + + testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}" + testImplementation "org.spockframework:spock-core:${revSpock}" + testImplementation "org.spockframework:spock-spring:${revSpock}" + + + testImplementation "com.netflix.conductor:conductor-server:${revConductor}" + testImplementation 'org.springframework.boot:spring-boot-starter-web' + testImplementation project(':conductor-test-util').sourceSets.test.output +} \ No newline at end of file diff --git a/message-publisher/rabbitmq-message-publisher/dependencies.lock b/message-publisher/rabbitmq-message-publisher/dependencies.lock new file mode 100644 index 00000000..df064988 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/dependencies.lock @@ -0,0 +1,261 @@ +{ + "annotationProcessor": { + "org.springframework.boot:spring-boot-configuration-processor": { + "locked": "2.7.16" + } + }, + "compileClasspath": { + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "locked": "3.15.0" + }, + "org.apache.logging.log4j:log4j-api": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "locked": "2.17.2" + }, + "org.springframework.boot:spring-boot-starter": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "2.7.16" + } + }, + "runtimeClasspath": { + "com.google.guava:guava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "32.1.2-jre" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.rabbitmq:amqp-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "5.14.3" + }, + "io.reactivex:rxjava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "1.3.8" + }, + "org.apache.commons:commons-lang3": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + } + }, + "testCompileClasspath": { + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-server": { + "locked": "3.15.0" + }, + "junit:junit": { + "locked": "4.13.2" + }, + "org.apache.logging.log4j:log4j-api": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "locked": "2.17.2" + }, + "org.codehaus.groovy:groovy-all": { + "locked": "3.0.19" + }, + "org.junit.vintage:junit-vintage-engine": { + "locked": "5.8.2" + }, + "org.spockframework:spock-core": { + "locked": "2.3-groovy-3.0" + }, + "org.spockframework:spock-spring": { + "locked": "2.3-groovy-3.0" + }, + "org.springframework.boot:spring-boot-starter-log4j2": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-test": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "2.7.16" + } + }, + "testRuntimeClasspath": { + "com.google.guava:guava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "32.1.2-jre" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.15.0" + }, + "com.netflix.conductor:conductor-server": { + "locked": "3.15.0" + }, + "com.rabbitmq:amqp-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "5.14.3" + }, + "io.reactivex:rxjava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "1.3.8" + }, + "junit:junit": { + "locked": "4.13.2" + }, + "org.apache.commons:commons-lang3": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-jul": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.apache.logging.log4j:log4j-web": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "2.17.2" + }, + "org.codehaus.groovy:groovy-all": { + "locked": "3.0.19" + }, + "org.junit.vintage:junit-vintage-engine": { + "locked": "5.8.2" + }, + "org.spockframework:spock-core": { + "locked": "2.3-groovy-3.0" + }, + "org.spockframework:spock-spring": { + "locked": "2.3-groovy-3.0" + }, + "org.springframework.boot:spring-boot-starter-log4j2": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-test": { + "locked": "2.7.16" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "2.7.16" + } + } +} \ No newline at end of file diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java new file mode 100644 index 00000000..b2bd4899 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -0,0 +1,81 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.config; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.rabbitmq.listener.TaskStatusPublisherRabbitMQ; +import com.netflix.conductor.rabbitmq.listener.WorkflowStatusPublisherRabbitMQ; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; +import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; + +@Configuration +@EnableConfigurationProperties(RabbitMQProperties.class) +@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "rabbitmq") +public class RabbitMQConfiguration { + + @Bean + public AMQPConnection amqpConnection(RabbitMQProperties rabbitMQProperties) { + ConnectionFactory connectionFactory = new ConnectionFactory(); + + connectionFactory.setHost(rabbitMQProperties.getHosts()); + connectionFactory.setPort(rabbitMQProperties.getPort()); + connectionFactory.setUsername(rabbitMQProperties.getUsername()); + connectionFactory.setPassword(rabbitMQProperties.getPassword()); + + Address[] addresses = + new Address[] { + new Address(rabbitMQProperties.getHosts(), rabbitMQProperties.getPort()) + }; + + AMQPRetryPattern retryPattern = new AMQPRetryPattern(); + + return AMQPConnection.getInstance(connectionFactory, addresses, retryPattern); + } + + @Bean + public RabbitMQService rabbitMQService( + AMQPConnection amqpConnection, ObjectMapper objectMapper) { + return new RabbitMQServiceImpl(amqpConnection, objectMapper); + } + + @ConditionalOnProperty( + name = "conductor.workflow-status-listener.type", + havingValue = "rabbitmq", + matchIfMissing = false) + @Bean + public WorkflowStatusListener workflowStatusListenerRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); + } + + @ConditionalOnProperty( + name = "conductor.task-status-listener.type", + havingValue = "rabbitmq", + matchIfMissing = false) + @Bean + public TaskStatusListener taskStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + return new TaskStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); + } +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java new file mode 100644 index 00000000..ea88b812 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java @@ -0,0 +1,137 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.config; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import com.rabbitmq.client.ConnectionFactory; + +@ConfigurationProperties("conductor.message-publisher.rabbitmq") +public class RabbitMQProperties { + + private String hosts = ConnectionFactory.DEFAULT_HOST; + private String username = ConnectionFactory.DEFAULT_USER; + private String password = ConnectionFactory.DEFAULT_PASS; + + private int port = ConnectionFactory.DEFAULT_AMQP_PORT; + private int maxChannelCount = 5000; + private int limit = 50; + private int duration = 1000; + + private String virtualHost = ConnectionFactory.DEFAULT_VHOST; + + private String allowedTaskStatuses; + + private String workflowStatusExchange; + private String taskStatusExchange; + + private boolean alwaysPublishWorkflowStatusEnabled = true; + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getMaxChannelCount() { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) { + this.maxChannelCount = maxChannelCount; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public int getDuration() { + return duration; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public String getVirtualHost() { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public String getWorkflowStatusExchange() { + return workflowStatusExchange; + } + + public void setWorkflowStatusExchange(String workflowStatusExchange) { + this.workflowStatusExchange = workflowStatusExchange; + } + + public String getTaskStatusExchange() { + return taskStatusExchange; + } + + public void setTaskStatusExchange(String taskStatusExchange) { + this.taskStatusExchange = taskStatusExchange; + } + + public List getAllowedTaskStatuses() { + return Arrays.asList(this.allowedTaskStatuses.split(",")); + } + + public void setAllowedTaskStatuses(String allowedTaskStatuses) { + this.allowedTaskStatuses = allowedTaskStatuses; + } + + public boolean isAlwaysPublishWorkflowStatusEnabled() { + return alwaysPublishWorkflowStatusEnabled; + } + + public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) { + this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled; + } +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java new file mode 100644 index 00000000..942ce582 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java @@ -0,0 +1,96 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; + +public class TaskStatusPublisherRabbitMQ implements TaskStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisherRabbitMQ.class); + + private final RabbitMQService rabbitMQService; + private final RabbitMQProperties rabbitMQProperties; + + public TaskStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + this.rabbitMQService = rabbitMQService; + this.rabbitMQProperties = rabbitMQProperties; + } + + @Override + public void onTaskScheduled(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskInProgress(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCanceled(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskFailed(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskFailedWithTerminalError(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCompleted(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCompletedWithErrors(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskTimedOut(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskSkipped(TaskModel task) { + publishMessage(task); + } + + private boolean IsStatusEnabled(TaskModel task) { + return rabbitMQProperties.getAllowedTaskStatuses().contains(task.getStatus().name()); + } + + private void publishMessage(TaskModel task) { + try { + if (IsStatusEnabled(task)) + rabbitMQService.publishMessage(rabbitMQProperties.getTaskStatusExchange(), task); + } catch (Exception e) { + LOGGER.error( + "Failed to publish message to exchange: {}. Exception: {}", + rabbitMQProperties.getTaskStatusExchange(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java new file mode 100644 index 00000000..f3210bf0 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java @@ -0,0 +1,85 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; + +public class WorkflowStatusPublisherRabbitMQ implements WorkflowStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusPublisherRabbitMQ.class); + private final RabbitMQService rabbitMQService; + private final RabbitMQProperties rabbitMQProperties; + + public WorkflowStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + this.rabbitMQService = rabbitMQService; + this.rabbitMQProperties = rabbitMQProperties; + } + + @Override + public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowCompleted(workflow); + } + } + + @Override + public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowTerminated(workflow); + } + } + + @Override + public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowFinalized(workflow); + } + } + + @Override + public void onWorkflowCompleted(WorkflowModel workflow) { + publishMessage(workflow); + } + + @Override + public void onWorkflowTerminated(WorkflowModel workflow) { + publishMessage(workflow); + } + + @Override + public void onWorkflowFinalized(WorkflowModel workflow) { + publishMessage(workflow); + } + + private void publishMessage(WorkflowModel workflow) { + try { + rabbitMQService.publishMessage( + rabbitMQProperties.getWorkflowStatusExchange(), workflow); + } catch (Exception e) { + LOGGER.error( + "Failed to publish message to exchange: {}. Exception: {}", + rabbitMQProperties.getWorkflowStatusExchange(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java new file mode 100644 index 00000000..ba76fabd --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java @@ -0,0 +1,24 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.services; + +import com.rabbitmq.client.BuiltinExchangeType; + +public interface RabbitMQService { + void publishMessage(String exchangeName, T content) throws Exception; + + void publishMessage( + String exchangeName, BuiltinExchangeType exchangeType, String routingKey, T content) + throws Exception; + + void close(); +} diff --git a/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java new file mode 100644 index 00000000..27a4bc42 --- /dev/null +++ b/message-publisher/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java @@ -0,0 +1,83 @@ +/* + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; +import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; + +public class RabbitMQServiceImpl implements RabbitMQService { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQServiceImpl.class); + + private final AMQPConnection amqpConnection; + + @Autowired private final ObjectMapper objectMapper; + + public RabbitMQServiceImpl(AMQPConnection amqpConnection, ObjectMapper objectMapper) { + this.amqpConnection = amqpConnection; + this.objectMapper = objectMapper; + } + + @Override + public void publishMessage(String exchangeName, T content) throws Exception { + Channel channel = amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, exchangeName); + + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false); + + String jsonMessage = serializeContent(content); + + channel.basicPublish(exchangeName, "", null, jsonMessage.getBytes()); + + amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel); + } + + @Override + public void publishMessage( + String exchangeName, BuiltinExchangeType exchangeType, String routingKey, T content) + throws Exception { + Channel channel = amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, exchangeName); + + channel.exchangeDeclare(exchangeName, exchangeType, false); + + String jsonMessage = serializeContent(content); + + channel.basicPublish(exchangeName, routingKey, null, jsonMessage.getBytes()); + + amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel); + } + + @Override + public void close() { + amqpConnection.close(); + } + + private String serializeContent(T content) { + try { + return objectMapper.writeValueAsString(content); + } catch (JsonProcessingException e) { + LOGGER.error( + "Failed to serialize message of type: {} to String. Exception: {}", + content.getClass(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/settings.gradle b/settings.gradle index 6ff0f24b..85d3d42d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ include 'index' include 'test-util' include 'lock' include 'workflow-event-listener' +include 'message-publisher' include 'persistence:common-persistence' include 'persistence:mysql-persistence' @@ -35,6 +36,7 @@ include 'external-payload-storage:postgres-external-storage' include 'event-queue:amqp' include 'event-queue:nats' include 'event-queue:nats-streaming' +include 'message-publisher:rabbitmq-message-publisher' include 'lock:zookeeper-lock' include 'task:kafka' include 'community-server' @@ -42,13 +44,13 @@ include 'community-server' rootProject.children.stream() .filter(p -> p.name.equals("persistence") || p.name.equals("index") || p.name.equals("external-payload-storage") || p.name.equals("event-queue") || - p.name.equals("lock") || p.name.equals("task")) + p.name.equals("lock") || p.name.equals("task") || p.name.equals("message-publisher")) .forEach(it -> it.children .forEach(c -> c.name = "conductor-" + c.name)) rootProject.children.stream() .filter(p -> !p.name.equals("persistence") && !p.name.equals("index") && !p.name.equals("external-payload-storage") && !p.name.equals("event-queue") && - !p.name.equals("lock") && !p.name.equals("task")) + !p.name.equals("lock") && !p.name.equals("task") && !p.name.equals("message-publisher")) .forEach(it -> it.name = "conductor-" + it.name)