Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 54949200B5E for ; Wed, 10 Aug 2016 21:19:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5336E160AA4; Wed, 10 Aug 2016 19:19:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 300D1160A8F for ; Wed, 10 Aug 2016 21:19:11 +0200 (CEST) Received: (qmail 32075 invoked by uid 500); 10 Aug 2016 19:19:10 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 32066 invoked by uid 99); 10 Aug 2016 19:19:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2016 19:19:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40E88E00A7; Wed, 10 Aug 2016 19:19:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: airavata git commit: Refactored messaging module to remove duplicate code and support multiple publishers Date: Wed, 10 Aug 2016 19:19:10 +0000 (UTC) archived-at: Wed, 10 Aug 2016 19:19:13 -0000 Repository: airavata Updated Branches: refs/heads/messaging-refactor [created] e4cc54d92 Refactored messaging module to remove duplicate code and support multiple publishers Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e4cc54d9 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e4cc54d9 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e4cc54d9 Branch: refs/heads/messaging-refactor Commit: e4cc54d923d406542ed5d007aa1fb48a8094d29f Parents: 2df4bb2 Author: Shameera Rathnayaka Authored: Wed Aug 10 15:18:53 2016 -0400 Committer: Shameera Rathnayaka Committed: Wed Aug 10 15:18:53 2016 -0400 ---------------------------------------------------------------------- .../org/apache/airavata/gfac/impl/Factory.java | 9 +- .../airavata/gfac/server/GfacServerHandler.java | 6 +- .../messaging/client/RabbitMQListener.java | 3 +- .../messaging/core/MessagingFactory.java | 86 +++- .../airavata/messaging/core/Publisher.java | 4 +- .../messaging/core/RabbitMQProperties.java | 125 ++++++ .../airavata/messaging/core/Subscriber.java | 5 - .../messaging/core/SubscriberProperties.java | 125 ------ .../airavata/messaging/core/TestClient.java | 2 +- .../apache/airavata/messaging/core/Type.java | 27 ++ .../impl/RabbitMQProcessLaunchPublisher.java | 156 +++---- .../messaging/core/impl/RabbitMQProducer.java | 440 +++++++++---------- .../messaging/core/impl/RabbitMQPublisher.java | 118 +++++ .../core/impl/RabbitMQStatusPublisher.java | 212 ++++----- .../messaging/core/impl/RabbitMQSubscriber.java | 7 +- .../server/OrchestratorServerHandler.java | 3 +- .../ExperimentExecution.java | 3 +- .../workflow/core/WorkflowEnactmentService.java | 7 +- .../workflow/core/WorkflowInterpreter.java | 8 +- 19 files changed, 777 insertions(+), 569 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index d105c18..673f37b 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -66,7 +66,8 @@ import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.Subscriber; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher; +import org.apache.airavata.messaging.core.Type; +import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; @@ -145,9 +146,9 @@ public abstract class Factory { public static Publisher getStatusPublisher() throws AiravataException { if (statusPublisher == null) { - synchronized (RabbitMQStatusPublisher.class) { + synchronized (RabbitMQPublisher.class) { if (statusPublisher == null) { - statusPublisher = new RabbitMQStatusPublisher(); + statusPublisher = MessagingFactory.getPublisher(Type.STATUS); } } } @@ -169,7 +170,7 @@ public abstract class Factory { public static synchronized Subscriber getProcessLaunchSubscriber() throws AiravataException { if (processLaunchSubscriber == null) { - processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Subscriber.Type.PROCESS_LAUNCH); + processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Type.PROCESS_LAUNCH); } return processLaunchSubscriber; } http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index a490d91..44073dc 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -39,7 +39,7 @@ import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.Subscriber; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher; +import org.apache.airavata.messaging.core.Type; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.ProcessIdentifier; import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; @@ -98,9 +98,9 @@ public class GfacServerHandler implements GfacService.Iface { // init process consumer List routingKeys = new ArrayList<>(); routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName()); - processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Subscriber.Type.PROCESS_LAUNCH); + processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH); // init status publisher - statusPublisher = new RabbitMQStatusPublisher(); + statusPublisher = Factory.getStatusPublisher(); } private void startCuratorClient() throws ApplicationSettingsException { http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java ---------------------------------------------------------------------- diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java index 984aa59..9003897 100644 --- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java +++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java @@ -27,6 +27,7 @@ import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Subscriber; +import org.apache.airavata.messaging.core.Type; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.messaging.event.MessageType; @@ -69,7 +70,7 @@ public class RabbitMQListener { System.out.println("broker url " + brokerUrl); final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); List routingKeys = getRoutingKeys(level); - Subscriber subscriber = MessagingFactory.getSubscriber(null, routingKeys, Subscriber.Type.STATUS); + Subscriber subscriber = MessagingFactory.getSubscriber(message -> {}, routingKeys, Type.STATUS); } catch (ApplicationSettingsException e) { logger.error("Error reading airavata server properties", e); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java index ee68d0c..99c11b8 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java @@ -23,29 +23,37 @@ package org.apache.airavata.messaging.core; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.messaging.core.impl.ProcessConsumer; +import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber; import org.apache.airavata.messaging.core.impl.StatusConsumer; +import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; -import java.util.ArrayList; import java.util.List; public class MessagingFactory { - public static Subscriber getSubscriber(final MessageHandler messageHandler,List routingKeys, Subscriber.Type type) throws AiravataException { + public static Subscriber getSubscriber(final MessageHandler messageHandler,List routingKeys, Type type) throws AiravataException { Subscriber subscriber = null; - SubscriberProperties sp = getSubscriberProperties(); + RabbitMQProperties rProperties = getProperties(); switch (type) { case EXPERIMENT_LAUNCH: break; case PROCESS_LAUNCH: - subscriber = getProcessSubscriber(sp); + subscriber = getProcessSubscriber(rProperties); subscriber.listen((connection ,channel) -> new ProcessConsumer(messageHandler, connection, channel), null, routingKeys); break; case STATUS: - subscriber = getStatusSubscriber(sp); + subscriber = getStatusSubscriber(rProperties); subscriber.listen((connection, channel) -> new StatusConsumer(messageHandler, connection, channel), null, routingKeys); @@ -57,24 +65,53 @@ public class MessagingFactory { return subscriber; } - private static SubscriberProperties getSubscriberProperties() { - return new SubscriberProperties() + public static Publisher getPublisher(Type type) throws AiravataException { + RabbitMQProperties rProperties = getProperties(); + Publisher publiser = null; + switch (type) { + case EXPERIMENT_LAUNCH: + break; + case PROCESS_LAUNCH: + publiser = gerProcessPublisher(rProperties); + break; + case STATUS: + publiser = getStatusPublisher(rProperties); + break; + default: + throw new IllegalArgumentException("Publisher " + type + " is not handled"); + } + + return publiser; + } + + private static Publisher getStatusPublisher(RabbitMQProperties rProperties) throws AiravataException { + rProperties.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName()); + return new RabbitMQPublisher(rProperties, MessagingFactory::statusRoutingkey); + } + + private static Publisher gerProcessPublisher(RabbitMQProperties rProperties) throws AiravataException { + rProperties.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName()); + return new RabbitMQPublisher(rProperties, messageContext -> rProperties.getExchangeName()); + } + + private static RabbitMQProperties getProperties() { + return new RabbitMQProperties() .setBrokerUrl(ServerSettings.RABBITMQ_BROKER_URL) .setDurable(ServerSettings.getRabbitmqDurableQueue()) .setPrefetchCount(ServerSettings.getRabbitmqPrefetchCount()) .setAutoRecoveryEnable(true) .setConsumerTag("default") - .setExchangeType(SubscriberProperties.EXCHANGE_TYPE.TOPIC); + .setExchangeType(RabbitMQProperties.EXCHANGE_TYPE.TOPIC); } - private static RabbitMQSubscriber getStatusSubscriber(SubscriberProperties sp) throws AiravataException { + private static RabbitMQSubscriber getStatusSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName()) .setAutoAck(true); return new RabbitMQSubscriber(sp); } - private static RabbitMQSubscriber getProcessSubscriber(SubscriberProperties sp) throws AiravataException { + private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName()) .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName()) .setAutoAck(false); @@ -82,7 +119,34 @@ public class MessagingFactory { } - + private static String statusRoutingkey(MessageContext msgCtx) { + String gatewayId = msgCtx.getGatewayId(); + String routingKey = null; + if (msgCtx.getType() == MessageType.EXPERIMENT) { + ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); + routingKey = gatewayId + "." + event.getExperimentId(); + } else if (msgCtx.getType() == MessageType.TASK) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); + routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); + } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) { + TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); + routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); + } else if (msgCtx.getType() == MessageType.PROCESS) { + ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent(); + ProcessIdentifier processIdentifier = event.getProcessIdentity(); + routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId(); + } else if (msgCtx.getType() == MessageType.JOB) { + JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent(); + JobIdentifier identity = event.getJobIdentity(); + routingKey = gatewayId + "." + identity.getExperimentId() + "." + + identity.getProcessId() + "." + + identity.getTaskId() + "." + + identity.getJobId(); + } + return routingKey; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java index b8b586c..28a9a06 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java @@ -31,8 +31,8 @@ public interface Publisher { /** * - * @param message object of message context which will include actual event and other information + * @param messageContext object of message context which will include actual event and other information * @throws AiravataException */ - public void publish(MessageContext message) throws AiravataException; + public void publish(MessageContext messageContext) throws AiravataException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java new file mode 100644 index 0000000..5bb1929 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.messaging.core; + +public class RabbitMQProperties { + private String brokerUrl; + private EXCHANGE_TYPE exchangeType; + private String exchangeName; + private int prefetchCount; + private boolean durable; + private String queueName; + private String consumerTag = "default"; + private boolean autoRecoveryEnable; + private boolean autoAck; + + public String getBrokerUrl() { + return brokerUrl; + } + + public RabbitMQProperties setBrokerUrl(String brokerUrl) { + this.brokerUrl = brokerUrl; + return this; + } + + public boolean isDurable() { + return durable; + } + + public RabbitMQProperties setDurable(boolean durable) { + this.durable = durable; + return this; + } + + public String getExchangeName() { + return exchangeName; + } + + public RabbitMQProperties setExchangeName(String exchangeName) { + this.exchangeName = exchangeName; + return this; + } + + public int getPrefetchCount() { + return prefetchCount; + } + + public RabbitMQProperties setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + return this; + } + + public String getQueueName() { + return queueName; + } + + public RabbitMQProperties setQueueName(String queueName) { + this.queueName = queueName; + return this; + } + + public String getConsumerTag() { + return consumerTag; + } + + public RabbitMQProperties setConsumerTag(String consumerTag) { + this.consumerTag = consumerTag; + return this; + } + + public boolean isAutoRecoveryEnable() { + return autoRecoveryEnable; + } + + public RabbitMQProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) { + this.autoRecoveryEnable = autoRecoveryEnable; + return this; + } + + public String getExchangeType() { + return exchangeType.type; + } + + public RabbitMQProperties setExchangeType(EXCHANGE_TYPE exchangeType) { + this.exchangeType = exchangeType; + return this; + } + + public boolean isAutoAck() { + return autoAck; + } + + public RabbitMQProperties setAutoAck(boolean autoAck) { + this.autoAck = autoAck; + return this; + } + + public enum EXCHANGE_TYPE{ + TOPIC("topic"), + FANOUT("fanout"); + + private String type; + + EXCHANGE_TYPE(String type) { + this.type = type; + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java index 7952cb3..cc357a0 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java @@ -50,9 +50,4 @@ public interface Subscriber { void sendAck(long deliveryTag); - enum Type { - EXPERIMENT_LAUNCH, - PROCESS_LAUNCH, - STATUS - } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java deleted file mode 100644 index 025e93b..0000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.airavata.messaging.core; - -public class SubscriberProperties { - private String brokerUrl; - private EXCHANGE_TYPE exchangeType; - private String exchangeName; - private int prefetchCount; - private boolean durable; - private String queueName; - private String consumerTag = "default"; - private boolean autoRecoveryEnable; - private boolean autoAck; - - public String getBrokerUrl() { - return brokerUrl; - } - - public SubscriberProperties setBrokerUrl(String brokerUrl) { - this.brokerUrl = brokerUrl; - return this; - } - - public boolean isDurable() { - return durable; - } - - public SubscriberProperties setDurable(boolean durable) { - this.durable = durable; - return this; - } - - public String getExchangeName() { - return exchangeName; - } - - public SubscriberProperties setExchangeName(String exchangeName) { - this.exchangeName = exchangeName; - return this; - } - - public int getPrefetchCount() { - return prefetchCount; - } - - public SubscriberProperties setPrefetchCount(int prefetchCount) { - this.prefetchCount = prefetchCount; - return this; - } - - public String getQueueName() { - return queueName; - } - - public SubscriberProperties setQueueName(String queueName) { - this.queueName = queueName; - return this; - } - - public String getConsumerTag() { - return consumerTag; - } - - public SubscriberProperties setConsumerTag(String consumerTag) { - this.consumerTag = consumerTag; - return this; - } - - public boolean isAutoRecoveryEnable() { - return autoRecoveryEnable; - } - - public SubscriberProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) { - this.autoRecoveryEnable = autoRecoveryEnable; - return this; - } - - public String getExchangeType() { - return exchangeType.type; - } - - public SubscriberProperties setExchangeType(EXCHANGE_TYPE exchangeType) { - this.exchangeType = exchangeType; - return this; - } - - public boolean isAutoAck() { - return autoAck; - } - - public SubscriberProperties setAutoAck(boolean autoAck) { - this.autoAck = autoAck; - return this; - } - - public enum EXCHANGE_TYPE{ - TOPIC("topic"), - FANOUT("fanout"); - - private String type; - - EXCHANGE_TYPE(String type) { - this.type = type; - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java index daa9886..65781fe 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java @@ -46,7 +46,7 @@ public class TestClient { List routingKeys = new ArrayList<>(); routingKeys.add(experimentId); routingKeys.add(experimentId + ".*"); - MessagingFactory.getSubscriber(getMessageHandler(),routingKeys, Subscriber.Type.STATUS); + MessagingFactory.getSubscriber(getMessageHandler(),routingKeys, Type.STATUS); } catch (ApplicationSettingsException e) { logger.error("Error reading airavata server properties", e); }catch (Exception e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java new file mode 100644 index 0000000..6980e1c --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.messaging.core; + +public enum Type { + EXPERIMENT_LAUNCH, + PROCESS_LAUNCH, + STATUS +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java index 5cf960e..ba4b6f2 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java @@ -1,78 +1,78 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.messaging.core.impl; - -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.*; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RabbitMQProcessLaunchPublisher implements Publisher{ - private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class); - private String launchTask; - - private RabbitMQProducer rabbitMQProducer; - - public RabbitMQProcessLaunchPublisher() throws Exception { - String brokerUrl; - try { - brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName(); - } catch (ApplicationSettingsException e) { - String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; - log.error(message, e); - throw new AiravataException(message, e); - } - rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null); - rabbitMQProducer.open(); - } - - public void publish(MessageContext msgCtx) throws AiravataException { - try { - log.info("Publishing to launch queue ..."); - byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); - Message message = new Message(); - message.setEvent(body); - message.setMessageId(msgCtx.getMessageId()); - message.setMessageType(msgCtx.getType()); - message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); - String routingKey = launchTask; - byte[] messageBody = ThriftUtils.serializeThriftObject(message); - rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey); - log.info("Successfully published to launch queue ..."); - } catch (TException e) { - String msg = "Error while deserializing the object"; - log.error(msg, e); - throw new AiravataException(msg, e); - } catch (Exception e) { - String msg = "Error while sending to rabbitmq"; - log.error(msg, e); - throw new AiravataException(msg, e); - } - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.messaging.core.impl; +// +//import org.apache.airavata.common.exception.AiravataException; +//import org.apache.airavata.common.exception.ApplicationSettingsException; +//import org.apache.airavata.common.utils.ServerSettings; +//import org.apache.airavata.common.utils.ThriftUtils; +//import org.apache.airavata.messaging.core.MessageContext; +//import org.apache.airavata.messaging.core.MessagingConstants; +//import org.apache.airavata.messaging.core.Publisher; +//import org.apache.airavata.model.messaging.event.*; +//import org.apache.thrift.TException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//public class RabbitMQProcessLaunchPublisher implements Publisher{ +// private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class); +// private String launchTask; +// +// private RabbitMQProducer rabbitMQProducer; +// +// public RabbitMQProcessLaunchPublisher() throws Exception { +// String brokerUrl; +// try { +// brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); +// launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName(); +// } catch (ApplicationSettingsException e) { +// String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; +// log.error(message, e); +// throw new AiravataException(message, e); +// } +// rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null); +// rabbitMQProducer.open(); +// } +// +// public void publish(MessageContext msgCtx) throws AiravataException { +// try { +// log.info("Publishing to launch queue ..."); +// byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); +// Message message = new Message(); +// message.setEvent(body); +// message.setMessageId(msgCtx.getMessageId()); +// message.setMessageType(msgCtx.getType()); +// message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); +// String routingKey = launchTask; +// byte[] messageBody = ThriftUtils.serializeThriftObject(message); +// rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey); +// log.info("Successfully published to launch queue ..."); +// } catch (TException e) { +// String msg = "Error while deserializing the object"; +// log.error(msg, e); +// throw new AiravataException(msg, e); +// } catch (Exception e) { +// String msg = "Error while sending to rabbitmq"; +// log.error(msg, e); +// throw new AiravataException(msg, e); +// } +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java index fc494d4..73138c7 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java @@ -1,220 +1,220 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.core.impl; - -import com.rabbitmq.client.*; -import org.apache.airavata.common.exception.AiravataException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class RabbitMQProducer { - public static final int DEFAULT_PRE_FETCH = 64; - - private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class); - - private Connection connection; - - private Channel channel; - - private QueueingConsumer consumer; - - private String consumerTag; - - private String exchangeName; - - private int prefetchCount = DEFAULT_PRE_FETCH; - - private boolean isReQueueOnFail = false; - - private String url; - - private String getExchangeType = "topic"; - - - public RabbitMQProducer(String url, String exchangeName,String getExchangeType) { - this.exchangeName = exchangeName; - this.url = url; - this.getExchangeType = getExchangeType; - } - - public RabbitMQProducer(String url, String exchangeName) { - this.exchangeName = exchangeName; - this.url = url; - } - - public void setPrefetchCount(int prefetchCount) { - this.prefetchCount = prefetchCount; - } - - public void setReQueueOnFail(boolean isReQueueOnFail) { - this.isReQueueOnFail = isReQueueOnFail; - } - - private void reset() { - consumerTag = null; - } - - private void reInitIfNecessary() throws Exception { - if (consumerTag == null || consumer == null) { - close(); - open(); - } - } - - public void close() { - log.info("Closing channel to exchange {}", exchangeName); - try { - if (channel != null && channel.isOpen()) { - if (consumerTag != null) { - channel.basicCancel(consumerTag); - } - channel.close(); - } - } catch (Exception e) { - log.debug("error closing channel and/or cancelling consumer", e); - } - try { - log.info("closing connection to rabbitmq: " + connection); - connection.close(); - } catch (Exception e) { - log.debug("error closing connection", e); - } - consumer = null; - consumerTag = null; - channel = null; - connection = null; - } - - public void open() throws AiravataException { - try { - connection = createConnection(); - channel = connection.createChannel(); - if (prefetchCount > 0) { - log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); - channel.basicQos(prefetchCount); - } - if(exchangeName!=null) { - channel.exchangeDeclare(exchangeName, getExchangeType, false); - } - } catch (Exception e) { - reset(); - String msg = "could not open channel for exchange " + exchangeName; - log.error(msg); - throw new AiravataException(msg, e); - } - } - - public void send(byte []message, String routingKey) throws Exception { - try { - channel.basicPublish(exchangeName, routingKey, null, message); - } catch (IOException e) { - String msg = "Failed to publish message to exchange: " + exchangeName; - log.error(msg, e); - throw new Exception(msg, e); - } - } - - public void sendToWorkerQueue(byte []message, String routingKey) throws Exception { - try { - channel.basicPublish( "", routingKey, - MessageProperties.PERSISTENT_TEXT_PLAIN, - message); - } catch (IOException e) { - String msg = "Failed to publish message to exchange: " + exchangeName; - log.error(msg, e); - throw new Exception(msg, e); - } - } - - private Connection createConnection() throws IOException { - try { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setUri(url); - connectionFactory.setAutomaticRecoveryEnabled(true); - Connection connection = connectionFactory.newConnection(); - connection.addShutdownListener(new ShutdownListener() { - public void shutdownCompleted(ShutdownSignalException cause) { - } - }); - log.info("connected to rabbitmq: " + connection + " for " + exchangeName); - return connection; - } catch (Exception e) { - log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName); - return null; - } - } - - public void ackMessage(Long msgId) throws Exception { - try { - channel.basicAck(msgId, false); - } catch (ShutdownSignalException sse) { - reset(); - String msg = "shutdown signal received while attempting to ack message"; - log.error(msg, sse); - throw new Exception(msg, sse); - } catch (Exception e) { - String s = "could not ack for msgId: " + msgId; - log.error(s, e); - throw new Exception(s, e); - } - } - - public void failMessage(Long msgId) throws Exception { - if (isReQueueOnFail) { - failWithRedelivery(msgId); - } else { - deadLetter(msgId); - } - } - - public void failWithRedelivery(Long msgId) throws Exception { - try { - channel.basicReject(msgId, true); - } catch (ShutdownSignalException sse) { - reset(); - String msg = "shutdown signal received while attempting to fail with redelivery"; - log.error(msg, sse); - throw new Exception(msg, sse); - } catch (Exception e) { - String msg = "could not fail with redelivery for msgId: " + msgId; - log.error(msg, e); - throw new Exception(msg, e); - } - } - - public void deadLetter(Long msgId) throws Exception { - try { - channel.basicReject(msgId, false); - } catch (ShutdownSignalException sse) { - reset(); - String msg = "shutdown signal received while attempting to fail with no redelivery"; - log.error(msg, sse); - throw new Exception(msg, sse); - } catch (Exception e) { - String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId; - log.error(msg, e); - throw new Exception(msg, e); - } - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +// */ +// +//package org.apache.airavata.messaging.core.impl; +// +//import com.rabbitmq.client.*; +//import org.apache.airavata.common.exception.AiravataException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.IOException; +// +//public class RabbitMQProducer { +// public static final int DEFAULT_PRE_FETCH = 64; +// +// private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class); +// +// private Connection connection; +// +// private Channel channel; +// +// private QueueingConsumer consumer; +// +// private String consumerTag; +// +// private String exchangeName; +// +// private int prefetchCount = DEFAULT_PRE_FETCH; +// +// private boolean isReQueueOnFail = false; +// +// private String url; +// +// private String getExchangeType = "topic"; +// +// +// public RabbitMQProducer(String url, String exchangeName,String getExchangeType) { +// this.exchangeName = exchangeName; +// this.url = url; +// this.getExchangeType = getExchangeType; +// } +// +// public RabbitMQProducer(String url, String exchangeName) { +// this.exchangeName = exchangeName; +// this.url = url; +// } +// +// public void setPrefetchCount(int prefetchCount) { +// this.prefetchCount = prefetchCount; +// } +// +// public void setReQueueOnFail(boolean isReQueueOnFail) { +// this.isReQueueOnFail = isReQueueOnFail; +// } +// +// private void reset() { +// consumerTag = null; +// } +// +// private void reInitIfNecessary() throws Exception { +// if (consumerTag == null || consumer == null) { +// close(); +// open(); +// } +// } +// +// public void close() { +// log.info("Closing channel to exchange {}", exchangeName); +// try { +// if (channel != null && channel.isOpen()) { +// if (consumerTag != null) { +// channel.basicCancel(consumerTag); +// } +// channel.close(); +// } +// } catch (Exception e) { +// log.debug("error closing channel and/or cancelling consumer", e); +// } +// try { +// log.info("closing connection to rabbitmq: " + connection); +// connection.close(); +// } catch (Exception e) { +// log.debug("error closing connection", e); +// } +// consumer = null; +// consumerTag = null; +// channel = null; +// connection = null; +// } +// +// public void open() throws AiravataException { +// try { +// connection = createConnection(); +// channel = connection.createChannel(); +// if (prefetchCount > 0) { +// log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); +// channel.basicQos(prefetchCount); +// } +// if(exchangeName!=null) { +// channel.exchangeDeclare(exchangeName, getExchangeType, false); +// } +// } catch (Exception e) { +// reset(); +// String msg = "could not open channel for exchange " + exchangeName; +// log.error(msg); +// throw new AiravataException(msg, e); +// } +// } +// +// public void send(byte []message, String routingKey) throws Exception { +// try { +// channel.basicPublish(exchangeName, routingKey, null, message); +// } catch (IOException e) { +// String msg = "Failed to publish message to exchange: " + exchangeName; +// log.error(msg, e); +// throw new Exception(msg, e); +// } +// } +// +// public void sendToWorkerQueue(byte []message, String routingKey) throws Exception { +// try { +// channel.basicPublish( "", routingKey, +// MessageProperties.PERSISTENT_TEXT_PLAIN, +// message); +// } catch (IOException e) { +// String msg = "Failed to publish message to exchange: " + exchangeName; +// log.error(msg, e); +// throw new Exception(msg, e); +// } +// } +// +// private Connection createConnection() throws IOException { +// try { +// ConnectionFactory connectionFactory = new ConnectionFactory(); +// connectionFactory.setUri(url); +// connectionFactory.setAutomaticRecoveryEnabled(true); +// Connection connection = connectionFactory.newConnection(); +// connection.addShutdownListener(new ShutdownListener() { +// public void shutdownCompleted(ShutdownSignalException cause) { +// } +// }); +// log.info("connected to rabbitmq: " + connection + " for " + exchangeName); +// return connection; +// } catch (Exception e) { +// log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName); +// return null; +// } +// } +// +// public void ackMessage(Long msgId) throws Exception { +// try { +// channel.basicAck(msgId, false); +// } catch (ShutdownSignalException sse) { +// reset(); +// String msg = "shutdown signal received while attempting to ack message"; +// log.error(msg, sse); +// throw new Exception(msg, sse); +// } catch (Exception e) { +// String s = "could not ack for msgId: " + msgId; +// log.error(s, e); +// throw new Exception(s, e); +// } +// } +// +// public void failMessage(Long msgId) throws Exception { +// if (isReQueueOnFail) { +// failWithRedelivery(msgId); +// } else { +// deadLetter(msgId); +// } +// } +// +// public void failWithRedelivery(Long msgId) throws Exception { +// try { +// channel.basicReject(msgId, true); +// } catch (ShutdownSignalException sse) { +// reset(); +// String msg = "shutdown signal received while attempting to fail with redelivery"; +// log.error(msg, sse); +// throw new Exception(msg, sse); +// } catch (Exception e) { +// String msg = "could not fail with redelivery for msgId: " + msgId; +// log.error(msg, e); +// throw new Exception(msg, e); +// } +// } +// +// public void deadLetter(Long msgId) throws Exception { +// try { +// channel.basicReject(msgId, false); +// } catch (ShutdownSignalException sse) { +// reset(); +// String msg = "shutdown signal received while attempting to fail with no redelivery"; +// log.error(msg, sse); +// throw new Exception(msg, sse); +// } catch (Exception e) { +// String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId; +// log.error(msg, e); +// throw new Exception(msg, e); +// } +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java new file mode 100644 index 0000000..3fdb3a1 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.messaging.core.impl; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.RabbitMQProperties; +import org.apache.airavata.model.messaging.event.Message; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Function; + +public class RabbitMQPublisher implements Publisher { + private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class); + private final RabbitMQProperties properties; + private final Function routingKeySupplier; + private Connection connection; + private Channel channel; + + public RabbitMQPublisher(RabbitMQProperties properties, Function routingKeySupplier) throws AiravataException { + this.properties = properties; + this.routingKeySupplier = routingKeySupplier; + connect(); + } + + private void connect() throws AiravataException { + try { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(properties.getBrokerUrl()); + connectionFactory.setAutomaticRecoveryEnabled(properties.isAutoRecoveryEnable()); + connection = connectionFactory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + } + }); + log.info("connected to rabbitmq: " + connection + " for " + properties.getExchangeName()); + channel = connection.createChannel(); + if (properties.getPrefetchCount() > 0) { + channel.basicQos(properties.getPrefetchCount()); + } + + if (properties.getExchangeName() != null) { + channel.exchangeDeclare(properties.getExchangeName(), + properties.getExchangeType(), + false); + } + } catch (Exception e) { + String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName(); + log.error(msg); + throw new AiravataException(msg, e); + } + + + } + + + @Override + public void publish(MessageContext messageContext) throws AiravataException { + try { + byte[] body = ThriftUtils.serializeThriftObject(messageContext.getEvent()); + Message message = new Message(); + message.setEvent(body); + message.setMessageId(messageContext.getMessageId()); + message.setMessageType(messageContext.getType()); + message.setUpdatedTime(messageContext.getUpdatedTime().getTime()); + String routingKey = routingKeySupplier.apply(messageContext); + byte[] messageBody = ThriftUtils.serializeThriftObject(message); + send(messageBody, routingKey); + } catch (TException e) { + String msg = "Error while deserializing the object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } + + public void send(byte []message, String routingKey) throws Exception { + try { + channel.basicPublish(properties.getExchangeName(), routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message); + } catch (IOException e) { + String msg = "Failed to publish message to exchange: " + properties.getExchangeName(); + log.error(msg, e); + throw new Exception(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java index 75077e9..6f728fe 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java @@ -1,106 +1,106 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.core.impl; - -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.*; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RabbitMQStatusPublisher implements Publisher { - - private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class); - - private RabbitMQProducer rabbitMQProducer; - -// StatCounter statCounter = StatCounter.getInstance(); - - public RabbitMQStatusPublisher() throws AiravataException { - String brokerUrl; - String exchangeName; - try { - brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); - } catch (ApplicationSettingsException e) { - String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; - log.error(message, e); - throw new AiravataException(message, e); - } - rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); - rabbitMQProducer.open(); - } - - public void publish(MessageContext msgCtx) throws AiravataException { - try { - log.info("Publishing status to rabbitmq..."); - byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); - Message message = new Message(); - message.setEvent(body); - message.setMessageId(msgCtx.getMessageId()); - message.setMessageType(msgCtx.getType()); - message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); - String gatewayId = msgCtx.getGatewayId(); - String routingKey = null; - if (msgCtx.getType() == MessageType.EXPERIMENT) { - ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); - routingKey = gatewayId + "." + event.getExperimentId(); - } else if (msgCtx.getType() == MessageType.TASK) { - TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + - event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); - } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) { - TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); - routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + - event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); - } else if (msgCtx.getType() == MessageType.PROCESS) { - ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent(); - ProcessIdentifier processIdentifier = event.getProcessIdentity(); - routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId(); - } else if (msgCtx.getType() == MessageType.JOB) { - JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent(); - JobIdentifier identity = event.getJobIdentity(); - routingKey = gatewayId + "." + identity.getExperimentId() + "." + - identity.getProcessId() + "." + - identity.getTaskId() + "." + - identity.getJobId(); - } - byte[] messageBody = ThriftUtils.serializeThriftObject(message); - rabbitMQProducer.send(messageBody, routingKey); -// statCounter.add(message); - } catch (TException e) { - String msg = "Error while deserializing the object"; - log.error(msg, e); - throw new AiravataException(msg, e); - } catch (Exception e) { - String msg = "Error while sending to rabbitmq"; - log.error(msg, e); - throw new AiravataException(msg, e); - } - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +// */ +// +//package org.apache.airavata.messaging.core.impl; +// +//import org.apache.airavata.common.exception.AiravataException; +//import org.apache.airavata.common.exception.ApplicationSettingsException; +//import org.apache.airavata.common.utils.ServerSettings; +//import org.apache.airavata.common.utils.ThriftUtils; +//import org.apache.airavata.messaging.core.MessageContext; +//import org.apache.airavata.messaging.core.MessagingConstants; +//import org.apache.airavata.messaging.core.Publisher; +//import org.apache.airavata.model.messaging.event.*; +//import org.apache.thrift.TException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//public class RabbitMQStatusPublisher implements Publisher { +// +// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class); +// +// private RabbitMQProducer rabbitMQProducer; +// +//// StatCounter statCounter = StatCounter.getInstance(); +// +// public RabbitMQStatusPublisher() throws AiravataException { +// String brokerUrl; +// String exchangeName; +// try { +// brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); +// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); +// } catch (ApplicationSettingsException e) { +// String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; +// log.error(message, e); +// throw new AiravataException(message, e); +// } +// rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); +// rabbitMQProducer.open(); +// } +// +// public void publish(MessageContext msgCtx) throws AiravataException { +// try { +// log.info("Publishing status to rabbitmq..."); +// byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); +// Message message = new Message(); +// message.setEvent(body); +// message.setMessageId(msgCtx.getMessageId()); +// message.setMessageType(msgCtx.getType()); +// message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); +// String gatewayId = msgCtx.getGatewayId(); +// String routingKey = null; +// if (msgCtx.getType() == MessageType.EXPERIMENT) { +// ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); +// routingKey = gatewayId + "." + event.getExperimentId(); +// } else if (msgCtx.getType() == MessageType.TASK) { +// TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); +// routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + +// event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); +// } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) { +// TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); +// routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + +// event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId(); +// } else if (msgCtx.getType() == MessageType.PROCESS) { +// ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent(); +// ProcessIdentifier processIdentifier = event.getProcessIdentity(); +// routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId(); +// } else if (msgCtx.getType() == MessageType.JOB) { +// JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent(); +// JobIdentifier identity = event.getJobIdentity(); +// routingKey = gatewayId + "." + identity.getExperimentId() + "." + +// identity.getProcessId() + "." + +// identity.getTaskId() + "." + +// identity.getJobId(); +// } +// byte[] messageBody = ThriftUtils.serializeThriftObject(message); +// rabbitMQProducer.send(messageBody, routingKey); +//// statCounter.add(message); +// } catch (TException e) { +// String msg = "Error while deserializing the object"; +// log.error(msg, e); +// throw new AiravataException(msg, e); +// } catch (Exception e) { +// String msg = "Error while sending to rabbitmq"; +// log.error(msg, e); +// throw new AiravataException(msg, e); +// } +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java index 188847f..441281d 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java @@ -27,9 +27,8 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.messaging.core.Subscriber; -import org.apache.airavata.messaging.core.SubscriberProperties; +import org.apache.airavata.messaging.core.RabbitMQProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +45,9 @@ public class RabbitMQSubscriber implements Subscriber { private Connection connection; private Channel channel; private Map queueDetailMap = new HashMap<>(); - private SubscriberProperties properties; + private RabbitMQProperties properties; - public RabbitMQSubscriber(SubscriberProperties properties) throws AiravataException { + public RabbitMQSubscriber(RabbitMQProperties properties) throws AiravataException { this.properties = properties; createConnection(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index f5c4d2a..5d02100 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -35,6 +35,7 @@ import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.PublisherFactory; import org.apache.airavata.messaging.core.Subscriber; +import org.apache.airavata.messaging.core.Type; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; @@ -130,7 +131,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { // routingKeys.add("*"); // listen for gateway level messages // routingKeys.add("*.*"); // listen for gateway/experiment level messages routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages - statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS); + statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Type.STATUS); startCurator(); } catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) { log.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java index fa4c3de..1e6edd8 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java @@ -26,6 +26,7 @@ import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Subscriber; +import org.apache.airavata.messaging.core.Type; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.ErrorModel; @@ -187,7 +188,7 @@ public class ExperimentExecution { String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT); System.out.println("broker url " + brokerUrl); final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT); - Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS); + Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Type.STATUS); } private List getRoutingKeys() { http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java index a492ef2..33f11c8 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java @@ -25,8 +25,9 @@ import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessagingFactory; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.Subscriber; -import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; +import org.apache.airavata.messaging.core.Type; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.ProcessIdentifier; import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; @@ -55,7 +56,7 @@ public class WorkflowEnactmentService { workflowMap = new ConcurrentHashMap<>(); statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))), getRoutingKeys(), - Subscriber.Type.STATUS); + Type.STATUS); // register the shutdown hook to un-bind status consumer. Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook()); } @@ -74,7 +75,7 @@ public class WorkflowEnactmentService { public void submitWorkflow(String experimentId, String credentialToken, String gatewayName, - RabbitMQProcessLaunchPublisher publisher) throws Exception { + Publisher publisher) throws Exception { WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter( experimentId, credentialToken,gatewayName, publisher); http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java index ecfdeea..e637c29 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java @@ -22,7 +22,7 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; +import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.ComponentState; import org.apache.airavata.model.ComponentStatus; import org.apache.airavata.model.application.io.OutputDataObjectType; @@ -75,18 +75,18 @@ class WorkflowInterpreter { private Map completeList = new HashMap<>(); private Registry registry; private List completeWorkflowOutputs = new ArrayList<>(); - private RabbitMQProcessLaunchPublisher publisher; + private Publisher publisher; private String consumerId; private boolean continueWorkflow = true; - public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException { + public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, Publisher publisher) throws RegistryException { this.gatewayName = gatewayName; setExperiment(experimentId); this.credentialToken = credentialToken; this.publisher = publisher; } - public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) { + public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, Publisher publisher) { this.gatewayName = gatewayName; this.experiment = experiment; this.credentialToken = credentialStoreToken;