Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7F63F200B6F for ; Wed, 10 Aug 2016 00:19:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7E4B2160AA5; Tue, 9 Aug 2016 22:19:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DE57160AB0 for ; Wed, 10 Aug 2016 00:19:46 +0200 (CEST) Received: (qmail 89303 invoked by uid 500); 9 Aug 2016 22:19:39 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 89192 invoked by uid 99); 9 Aug 2016 22:19:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 22:19:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52B97E058E; Tue, 9 Aug 2016 22:19:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Date: Tue, 09 Aug 2016 22:19:40 -0000 Message-Id: In-Reply-To: <21bda166c42445438a853fabd3a00d10@git.apache.org> References: <21bda166c42445438a853fabd3a00d10@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers archived-at: Tue, 09 Aug 2016 22:19:48 -0000 http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java deleted file mode 100644 index 561cde2..0000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.core.impl; - - -import com.rabbitmq.client.*; -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.Consumer; -import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.model.messaging.event.*; -import org.apache.thrift.TBase; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class RabbitMQStatusConsumer implements Consumer { - public static final String EXCHANGE_TYPE = "topic"; - private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class); - - private String exchangeName; - private String url; - private Connection connection; - private Channel channel; - private int prefetchCount; - private Map queueDetailsMap = new HashMap(); - - public RabbitMQStatusConsumer() throws AiravataException { - try { - url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); - prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64))); - createConnection(); - } catch (ApplicationSettingsException e) { - String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; - log.error(message, e); - throw new AiravataException(message, e); - } - } - - public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException { - this.exchangeName = exchangeName; - this.url = brokerUrl; - - createConnection(); - } - - private void createConnection() throws AiravataException { - try { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setUri(url); - connectionFactory.setAutomaticRecoveryEnabled(true); - connection = connectionFactory.newConnection(); - connection.addShutdownListener(new ShutdownListener() { - public void shutdownCompleted(ShutdownSignalException cause) { - } - }); - log.info("connected to rabbitmq: " + connection + " for " + exchangeName); - - channel = connection.createChannel(); - channel.basicQos(prefetchCount); - channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false); - - } catch (Exception e) { - String msg = "could not open channel for exchange " + exchangeName; - log.error(msg); - throw new AiravataException(msg, e); - } - } - - public String listen(final MessageHandler handler) throws AiravataException { - try { - Map props = handler.getProperties(); - final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); - if (routing == null) { - throw new IllegalArgumentException("The routing key must be present"); - } - - List keys = new ArrayList(); - if (routing instanceof List) { - for (Object o : (List)routing) { - keys.add(o.toString()); - } - } else if (routing instanceof String) { - keys.add((String) routing); - } - - String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); - String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); - if (queueName == null) { - if (!channel.isOpen()) { - channel = connection.createChannel(); - channel.exchangeDeclare(exchangeName, "topic", false); - } - queueName = channel.queueDeclare().getQueue(); - } else { - channel.queueDeclare(queueName, true, false, false, null); - } - - final String id = getId(keys, queueName); - if (queueDetailsMap.containsKey(id)) { - throw new IllegalStateException("This subscriber is already defined for this Consumer, " + - "cannot define the same subscriber twice"); - } - - if (consumerTag == null) { - consumerTag = "default"; - } - - // bind all the routing keys - for (String routingKey : keys) { - channel.queueBind(queueName, exchangeName, routingKey); - } - - channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) { - Message message = new Message(); - - try { - ThriftUtils.createThriftFromBytes(body, message); - TBase event = null; - String gatewayId = null; - - if (message.getMessageType().equals(MessageType.EXPERIMENT)) { - ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - experimentStatusChangeEvent.getState()); - event = experimentStatusChangeEvent; - gatewayId = experimentStatusChangeEvent.getGatewayId(); - } else if (message.getMessageType().equals(MessageType.PROCESS)) { - ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent); - log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " + - "message type " + message.getMessageType() + " with status " + - processStatusChangeEvent.getState()); - event = processStatusChangeEvent; - gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId(); - } else if (message.getMessageType().equals(MessageType.TASK)) { - TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - taskStatusChangeEvent.getState()); - event = taskStatusChangeEvent; - gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId(); - }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) { - TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType()); - event = taskOutputChangeEvent; - gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId(); - } else if (message.getMessageType().equals(MessageType.JOB)) { - JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - jobStatusChangeEvent.getState()); - event = jobStatusChangeEvent; - gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId(); - } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) { - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' for experimentId: " + - taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId()); - event = taskSubmitEvent; - gatewayId = taskSubmitEvent.getGatewayId(); - } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) { - TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' for experimentId: " + - taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId()); - event = taskTerminateEvent; - gatewayId = null; - } - MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); - messageContext.setIsRedeliver(envelope.isRedeliver()); - handler.onMessage(messageContext); - } catch (TException e) { - String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; - log.warn(msg, e); - } - } - }); - // save the name for deleting the queue - queueDetailsMap.put(id, new QueueDetails(queueName, keys)); - return id; - } catch (Exception e) { - String msg = "could not open channel for exchange " + exchangeName; - log.error(msg); - throw new AiravataException(msg, e); - } - } - - public void stopListen(final String id) throws AiravataException { - QueueDetails details = queueDetailsMap.get(id); - if (details != null) { - try { - for (String key : details.getRoutingKeys()) { - channel.queueUnbind(details.getQueueName(), exchangeName, key); - } - channel.queueDelete(details.getQueueName(), true, true); - } catch (IOException e) { - String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName; - log.debug(msg); - } - } - } - - /** - * Private class for holding some information about the consumers registered - */ - private class QueueDetails { - String queueName; - - List routingKeys; - - private QueueDetails(String queueName, List routingKeys) { - this.queueName = queueName; - this.routingKeys = routingKeys; - } - - public String getQueueName() { - return queueName; - } - - public List getRoutingKeys() { - return routingKeys; - } - } - - private String getId(List routingKeys, String queueName) { - String id = ""; - for (String key : routingKeys) { - id = id + "_" + key; - } - return id + "_" + queueName; - } - - public void close() { - if (connection != null) { - try { - connection.close(); - } catch (IOException ignore) { - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 60cb7a0..f5c4d2a 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -29,8 +29,12 @@ import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.scheduler.HostScheduler; -import org.apache.airavata.messaging.core.*; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingFactory; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.PublisherFactory; +import org.apache.airavata.messaging.core.Subscriber; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; @@ -59,7 +63,14 @@ import org.apache.airavata.orchestrator.util.OrchestratorUtils; import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractResource; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource; -import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.ComputeResource; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.ReplicaCatalog; +import org.apache.airavata.registry.cpi.ReplicaCatalogException; import org.apache.commons.lang.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -72,7 +83,12 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; public class OrchestratorServerHandler implements OrchestratorService.Iface { private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class); @@ -83,7 +99,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private String airavataUserName; private String gatewayName; private Publisher publisher; - private RabbitMQStatusConsumer statusConsumer; + private Subscriber statusSubscribe; private CuratorFramework curatorClient; /** @@ -110,10 +126,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { appCatalog = RegistryFactory.getAppCatalog(); orchestrator.initialize(); orchestrator.getOrchestratorContext().setPublisher(this.publisher); - String brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); - statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); - statusConsumer.listen(new ProcessStatusHandler()); + List routingKeys = new ArrayList<>(); +// routingKeys.add("*"); // listen for gateway level messages +// routingKeys.add("*.*"); // listen for gateway/experiment level messages + routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages + statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS); startCurator(); } catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) { log.error(e.getMessage(), e); @@ -481,18 +498,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } private class ProcessStatusHandler implements MessageHandler { - - @Override - public Map getProperties() { - Map props = new HashMap<>(); - List routingKeys = new ArrayList<>(); -// routingKeys.add("*"); // listen for gateway level messages -// routingKeys.add("*.*"); // listen for gateway/experiment level messages - routingKeys.add("*.*.*"); // listern for gateway/experiment/process level messages - props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); - return props; - } - /** * This method only handle MessageType.PROCESS type messages. * @param message http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java ---------------------------------------------------------------------- diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java index 49af1ce..fa4c3de 100644 --- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java +++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java @@ -24,9 +24,8 @@ package org.apache.airavata.testsuite.multitenantedairavata; import org.apache.airavata.api.Airavata; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.messaging.core.MessagingFactory; +import org.apache.airavata.messaging.core.Subscriber; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.ErrorModel; @@ -56,7 +55,11 @@ import java.io.File; import java.io.PrintWriter; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class ExperimentExecution { private Airavata.Client airavata; @@ -92,7 +95,7 @@ public class ExperimentExecution { String resultFileName = resultFileLocation + getResultFileName(); File resultFolder = new File(resultFileLocation); - if (!resultFolder.exists()){ + if (!resultFolder.exists()) { resultFolder.mkdir(); } File resultFile = new File(resultFileName); @@ -109,11 +112,11 @@ public class ExperimentExecution { this.resultWriter = resultWriter; } - protected Map> getApplicationMap (Map tokenMap) throws Exception{ + protected Map> getApplicationMap(Map tokenMap) throws Exception { appInterfaceMap = new HashMap>(); try { - if (tokenMap != null && !tokenMap.isEmpty()){ - for (String gatewayId : tokenMap.keySet()){ + if (tokenMap != null && !tokenMap.isEmpty()) { + for (String gatewayId : tokenMap.keySet()) { Map allApplicationInterfaceNames = airavata.getAllApplicationInterfaceNames(authzToken, gatewayId); appInterfaceMap.put(gatewayId, allApplicationInterfaceNames); } @@ -134,19 +137,19 @@ public class ExperimentExecution { return appInterfaceMap; } - protected Map> getProjects (Map tokenMap) throws Exception{ + protected Map> getProjects(Map tokenMap) throws Exception { projectsMap = new HashMap>(); try { - if (tokenMap != null && !tokenMap.isEmpty()){ - for (String gatewayId : tokenMap.keySet()){ + if (tokenMap != null && !tokenMap.isEmpty()) { + for (String gatewayId : tokenMap.keySet()) { boolean isgatewayValid = true; - for (String ovoidGateway : gatewaysToAvoid){ - if (gatewayId.equals(ovoidGateway)){ + for (String ovoidGateway : gatewaysToAvoid) { + if (gatewayId.equals(ovoidGateway)) { isgatewayValid = false; break; } } - if (isgatewayValid){ + if (isgatewayValid) { List allUserProjects = airavata.getUserProjects(authzToken, gatewayId, testUser, 5, 0); projectsMap.put(gatewayId, allUserProjects); } @@ -168,127 +171,119 @@ public class ExperimentExecution { return projectsMap; } - public void launchExperiments () throws Exception { + public void launchExperiments() throws Exception { try { - for (String expId : experimentsWithTokens.keySet()){ + for (String expId : experimentsWithTokens.keySet()) { airavata.launchExperiment(authzToken, expId, experimentsWithTokens.get(expId)); } - }catch (Exception e){ + } catch (Exception e) { logger.error("Error while launching experiment", e); throw new Exception("Error while launching experiment", e); } } - public void monitorExperiments () throws Exception { + public void monitorExperiments() throws Exception { String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT); System.out.println("broker url " + brokerUrl); final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT); - RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); - - consumer.listen(new MessageHandler() { - @Override - public Map getProperties() { - Map props = new HashMap(); - List routingKeys = new ArrayList(); - for (String expId : experimentsWithGateway.keySet()) { - String gatewayId = experimentsWithGateway.get(expId); - System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId); - - routingKeys.add(gatewayId); - routingKeys.add(gatewayId + "." + expId); - routingKeys.add(gatewayId + "." + expId + ".*"); - routingKeys.add(gatewayId + "." + expId + ".*.*"); - routingKeys.add(gatewayId + "." + expId + ".*.*.*"); - } - props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); - return props; - } + Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS); + } - @Override - public void onMessage(MessageContext message) { - - if (message.getType().equals(MessageType.EXPERIMENT)) { - try { - ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); - ExperimentState expState = event.getState(); - String expId = event.getExperimentId(); - String gatewayId = event.getGatewayId(); - - if (expState.equals(ExperimentState.COMPLETED)) { - resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId); - resultWriter.println("====================================================================="); - resultWriter.println("Status : " + ExperimentState.COMPLETED.toString()); - // check file transfers - List experimentOutputs = airavata.getExperimentOutputs(authzToken, expId); - int i = 1; - for (OutputDataObjectType output : experimentOutputs) { - System.out.println("################ Experiment : " + expId + " COMPLETES ###################"); - System.out.println("Output " + i + " : " + output.getValue()); - resultWriter.println("Output " + i + " : " + output.getValue()); - i++; - } - resultWriter.println("End of Results for Experiment : " + expId ); - resultWriter.println("====================================================================="); - } else if (expState.equals(ExperimentState.FAILED)) { - resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId); - resultWriter.println("====================================================================="); - int j = 1; - resultWriter.println("Status : " + ExperimentState.FAILED.toString()); - System.out.println("################ Experiment : " + expId + " FAILED ###################"); - ExperimentModel experiment = airavata.getExperiment(authzToken, expId); - List errors = experiment.getErrors(); - if (errors != null && !errors.isEmpty()){ - for (ErrorModel errorDetails : errors) { - System.out.println(errorDetails.getActualErrorMessage()); - resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage()); - resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage()); - } - } + private List getRoutingKeys() { + List routingKeys = new ArrayList(); + for (String expId : experimentsWithGateway.keySet()) { + String gatewayId = experimentsWithGateway.get(expId); + System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId); + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + expId); + routingKeys.add(gatewayId + "." + expId + ".*"); + routingKeys.add(gatewayId + "." + expId + ".*.*"); + routingKeys.add(gatewayId + "." + expId + ".*.*.*"); + } + return routingKeys; + } - resultWriter.println("End of Results for Experiment : " + expId ); - resultWriter.println("====================================================================="); + private void processMessage(MessageContext message) { + if (message.getType().equals(MessageType.EXPERIMENT)) { + try { + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + ExperimentState expState = event.getState(); + String expId = event.getExperimentId(); + String gatewayId = event.getGatewayId(); + + if (expState.equals(ExperimentState.COMPLETED)) { + resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId); + resultWriter.println("====================================================================="); + resultWriter.println("Status : " + ExperimentState.COMPLETED.toString()); + // check file transfers + List experimentOutputs = airavata.getExperimentOutputs(authzToken, expId); + int i = 1; + for (OutputDataObjectType output : experimentOutputs) { + System.out.println("################ Experiment : " + expId + " COMPLETES ###################"); + System.out.println("Output " + i + " : " + output.getValue()); + resultWriter.println("Output " + i + " : " + output.getValue()); + i++; + } + resultWriter.println("End of Results for Experiment : " + expId); + resultWriter.println("====================================================================="); + } else if (expState.equals(ExperimentState.FAILED)) { + resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId); + resultWriter.println("====================================================================="); + int j = 1; + resultWriter.println("Status : " + ExperimentState.FAILED.toString()); + System.out.println("################ Experiment : " + expId + " FAILED ###################"); + ExperimentModel experiment = airavata.getExperiment(authzToken, expId); + List errors = experiment.getErrors(); + if (errors != null && !errors.isEmpty()) { + for (ErrorModel errorDetails : errors) { + System.out.println(errorDetails.getActualErrorMessage()); + resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage()); + resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage()); } + } + + resultWriter.println("End of Results for Experiment : " + expId); + resultWriter.println("====================================================================="); + } // System.out.println(" Experiment Id : '" + expId // + "' with state : '" + event.getState().toString() + // " for Gateway " + event.getGatewayId()); - } catch (TException e) { - logger.error(e.getMessage(), e); - } - } else if (message.getType().equals(MessageType.JOB)) { - try { - JobStatusChangeEvent event = new JobStatusChangeEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + } else if (message.getType().equals(MessageType.JOB)) { + try { + JobStatusChangeEvent event = new JobStatusChangeEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); // System.out.println(" Job ID : '" + event.getJobIdentity().getJobId() // + "' with state : '" + event.getState().toString() + // " for Gateway " + event.getJobIdentity().getGatewayId()); // resultWriter.println("Job Status : " + event.getState().toString()); - } catch (TException e) { - logger.error(e.getMessage(), e); - } - } - resultWriter.flush(); + } catch (TException e) { + logger.error(e.getMessage(), e); } - }); + } + resultWriter.flush(); } - private String getResultFileName (){ + private String getResultFileName() { DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HHmmss"); Calendar cal = Calendar.getInstance(); return dateFormat.format(cal.getTime()); } - public void createAmberWithErrorInputs (String gatewayId, - String token, - String projectId, - String hostId, - String appId) throws Exception { + public void createAmberWithErrorInputs(String gatewayId, + String token, + String projectId, + String hostId, + String appId) throws Exception { try { List applicationInputs = airavata.getApplicationInputs(authzToken, appId); List appOutputs = airavata.getApplicationOutputs(authzToken, appId); @@ -352,11 +347,11 @@ public class ExperimentExecution { } } - public void createAmberWithErrorUserConfig (String gatewayId, - String token, - String projectId, - String hostId, - String appId) throws Exception { + public void createAmberWithErrorUserConfig(String gatewayId, + String token, + String projectId, + String hostId, + String appId) throws Exception { try { TestFrameworkProps.Error[] errors = properties.getErrors(); @@ -422,25 +417,25 @@ public class ExperimentExecution { } } - public void createAmberExperiment () throws Exception{ + public void createAmberExperiment() throws Exception { try { TestFrameworkProps.Application[] applications = properties.getApplications(); Map userGivenAmberInputs = new HashMap<>(); - for (TestFrameworkProps.Application application : applications){ - if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){ + for (TestFrameworkProps.Application application : applications) { + if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) { userGivenAmberInputs = application.getInputs(); } } - for (String gatewayId : csTokens.keySet()){ + for (String gatewayId : csTokens.keySet()) { String token = csTokens.get(gatewayId); Map appsWithNames = appInterfaceMap.get(gatewayId); - for (String appId : appsWithNames.keySet()){ + for (String appId : appsWithNames.keySet()) { String appName = appsWithNames.get(appId); - if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){ + if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) { List applicationInputs = airavata.getApplicationInputs(authzToken, appId); List appOutputs = airavata.getApplicationOutputs(authzToken, appId); - for (String inputName : userGivenAmberInputs.keySet()){ + for (String inputName : userGivenAmberInputs.keySet()) { for (InputDataObjectType inputDataObjectType : applicationInputs) { if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) { inputDataObjectType.setValue(userGivenAmberInputs.get(inputName)); @@ -449,7 +444,7 @@ public class ExperimentExecution { } List projectsPerGateway = projectsMap.get(gatewayId); String projectID = null; - if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){ + if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) { projectID = projectsPerGateway.get(0).getProjectID(); } ExperimentModel simpleExperiment = @@ -470,7 +465,7 @@ public class ExperimentExecution { experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); experimentsWithTokens.put(experimentId, token); experimentsWithGateway.put(experimentId, gatewayId); - }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) { + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) { ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0); UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); userConfigurationData.setAiravataAutoSchedule(false); @@ -498,33 +493,33 @@ public class ExperimentExecution { } } } - }catch (Exception e){ + } catch (Exception e) { logger.error("Error while creating AMBEr experiment", e); throw new Exception("Error while creating AMBER experiment", e); } } - public void createUltrascanExperiment () throws Exception{ + public void createUltrascanExperiment() throws Exception { try { TestFrameworkProps.Application[] applications = properties.getApplications(); int numberOfIterations = properties.getNumberOfIterations(); Map userGivenAmberInputs = new HashMap<>(); - for (TestFrameworkProps.Application application : applications){ - if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){ + for (TestFrameworkProps.Application application : applications) { + if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) { userGivenAmberInputs = application.getInputs(); } } - for (int i=0; i < numberOfIterations; i++){ - for (String gatewayId : csTokens.keySet()){ + for (int i = 0; i < numberOfIterations; i++) { + for (String gatewayId : csTokens.keySet()) { String token = csTokens.get(gatewayId); Map appsWithNames = appInterfaceMap.get(gatewayId); - for (String appId : appsWithNames.keySet()){ + for (String appId : appsWithNames.keySet()) { String appName = appsWithNames.get(appId); - if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){ + if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) { List applicationInputs = airavata.getApplicationInputs(authzToken, appId); List appOutputs = airavata.getApplicationOutputs(authzToken, appId); - for (String inputName : userGivenAmberInputs.keySet()){ + for (String inputName : userGivenAmberInputs.keySet()) { for (InputDataObjectType inputDataObjectType : applicationInputs) { if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) { inputDataObjectType.setValue(userGivenAmberInputs.get(inputName)); @@ -533,7 +528,7 @@ public class ExperimentExecution { } List projectsPerGateway = projectsMap.get(gatewayId); String projectID = null; - if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){ + if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) { projectID = projectsPerGateway.get(0).getProjectID(); } ExperimentModel simpleExperiment = @@ -554,7 +549,7 @@ public class ExperimentExecution { experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); experimentsWithTokens.put(experimentId, token); experimentsWithGateway.put(experimentId, gatewayId); - }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) { + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) { ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "batch", 30, 0); UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); userConfigurationData.setAiravataAutoSchedule(false); @@ -564,7 +559,7 @@ public class ExperimentExecution { experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); experimentsWithTokens.put(experimentId, token); experimentsWithGateway.put(experimentId, gatewayId); - }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) { + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) { ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0); UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); userConfigurationData.setAiravataAutoSchedule(false); @@ -574,17 +569,17 @@ public class ExperimentExecution { experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); experimentsWithTokens.put(experimentId, token); experimentsWithGateway.put(experimentId, gatewayId); - }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) { + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) { ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "compute", 30, 0); UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); userConfigurationData.setAiravataAutoSchedule(false); userConfigurationData.setOverrideManualScheduledParams(false); userConfigurationData.setComputationalResourceScheduling(scheduling); simpleExperiment.setUserConfigurationData(userConfigurationData); - experimentId = airavata.createExperiment(authzToken,gatewayId, simpleExperiment); + experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); experimentsWithTokens.put(experimentId, token); experimentsWithGateway.put(experimentId, gatewayId); - }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) { + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) { ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0); UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); userConfigurationData.setAiravataAutoSchedule(false); @@ -602,89 +597,89 @@ public class ExperimentExecution { } } - }catch (Exception e){ + } catch (Exception e) { logger.error("Error while creating Ultrascan experiment", e); throw new Exception("Error while creating Ultrascan experiment", e); } } - public void createEchoExperiment () throws Exception{ + public void createEchoExperiment() throws Exception { try { for (String gatewayId : csTokens.keySet()) { - boolean isgatewayValid = true; - for (String ovoidGateway : gatewaysToAvoid){ - if (gatewayId.equals(ovoidGateway)){ - isgatewayValid = false; - break; - } + boolean isgatewayValid = true; + for (String ovoidGateway : gatewaysToAvoid) { + if (gatewayId.equals(ovoidGateway)) { + isgatewayValid = false; + break; } - if (isgatewayValid) { - String token = csTokens.get(gatewayId); - Map appsWithNames = appInterfaceMap.get(gatewayId); - for (String appId : appsWithNames.keySet()) { - String appName = appsWithNames.get(appId); - if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) { - List applicationInputs = airavata.getApplicationInputs(authzToken, appId); - List appOutputs = airavata.getApplicationOutputs(authzToken, appId); - for (InputDataObjectType inputDataObjectType : applicationInputs) { - if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) { - inputDataObjectType.setValue("Hello World !!!"); - } + } + if (isgatewayValid) { + String token = csTokens.get(gatewayId); + Map appsWithNames = appInterfaceMap.get(gatewayId); + for (String appId : appsWithNames.keySet()) { + String appName = appsWithNames.get(appId); + if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) { + List applicationInputs = airavata.getApplicationInputs(authzToken, appId); + List appOutputs = airavata.getApplicationOutputs(authzToken, appId); + for (InputDataObjectType inputDataObjectType : applicationInputs) { + if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) { + inputDataObjectType.setValue("Hello World !!!"); } + } - List projectsPerGateway = projectsMap.get(gatewayId); - String projectID = null; - if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) { - projectID = projectsPerGateway.get(0).getProjectID(); - } - ExperimentModel simpleExperiment = - ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs); - simpleExperiment.setExperimentOutputs(appOutputs); - String experimentId; - Map computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId); - if (computeResources != null && computeResources.size() != 0) { - for (String id : computeResources.keySet()) { - String resourceName = computeResources.get(id); - if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) { - ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0); - UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); - userConfigurationData.setAiravataAutoSchedule(false); - userConfigurationData.setOverrideManualScheduledParams(false); - userConfigurationData.setComputationalResourceScheduling(scheduling); - simpleExperiment.setUserConfigurationData(userConfigurationData); - experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); - experimentsWithTokens.put(experimentId, token); - experimentsWithGateway.put(experimentId, gatewayId); - } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) { - ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0); - UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); - userConfigurationData.setAiravataAutoSchedule(false); - userConfigurationData.setOverrideManualScheduledParams(false); - userConfigurationData.setComputationalResourceScheduling(scheduling); - simpleExperiment.setUserConfigurationData(userConfigurationData); - experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); - experimentsWithTokens.put(experimentId, token); - experimentsWithGateway.put(experimentId, gatewayId); - } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) { - ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0); - UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); - userConfigurationData.setAiravataAutoSchedule(false); - userConfigurationData.setOverrideManualScheduledParams(false); - userConfigurationData.setComputationalResourceScheduling(scheduling); - simpleExperiment.setUserConfigurationData(userConfigurationData); - experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); - experimentsWithTokens.put(experimentId, token); - experimentsWithGateway.put(experimentId, gatewayId); - } + List projectsPerGateway = projectsMap.get(gatewayId); + String projectID = null; + if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) { + projectID = projectsPerGateway.get(0).getProjectID(); + } + ExperimentModel simpleExperiment = + ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs); + simpleExperiment.setExperimentOutputs(appOutputs); + String experimentId; + Map computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId); + if (computeResources != null && computeResources.size() != 0) { + for (String id : computeResources.keySet()) { + String resourceName = computeResources.get(id); + if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) { + ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0); + UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) { + ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0); + UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); + } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) { + ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0); + UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment); + experimentsWithTokens.put(experimentId, token); + experimentsWithGateway.put(experimentId, gatewayId); } } } } } + } } - }catch (Exception e){ + } catch (Exception e) { logger.error("Error while creating Echo experiment", e); throw new Exception("Error while creating Echo experiment", e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java index 8339aea..a492ef2 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java @@ -24,16 +24,18 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.MessagingFactory; +import org.apache.airavata.messaging.core.Subscriber; import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; -import org.apache.airavata.model.messaging.event.*; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,16 +45,17 @@ import java.util.concurrent.Executors; public class WorkflowEnactmentService { private static WorkflowEnactmentService workflowEnactmentService; - private final RabbitMQStatusConsumer statusConsumer; + private final Subscriber statusSubscriber; private String consumerId; private ExecutorService executor; private Map workflowMap; private WorkflowEnactmentService () throws AiravataException { executor = Executors.newFixedThreadPool(getThreadPoolSize()); - workflowMap = new ConcurrentHashMap(); - statusConsumer = new RabbitMQStatusConsumer(); - consumerId = statusConsumer.listen(new TaskMessageHandler()); + workflowMap = new ConcurrentHashMap<>(); + statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))), + getRoutingKeys(), + Subscriber.Type.STATUS); // register the shutdown hook to un-bind status consumer. Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook()); } @@ -80,33 +83,20 @@ public class WorkflowEnactmentService { } - private int getThreadPoolSize() { - return ServerSettings.getEnactmentThreadPoolSize(); - } - - private class TaskMessageHandler implements MessageHandler { - - @Override - public Map getProperties() { - Map props = new HashMap(); - String gatewayId = "*"; - String experimentId = "*"; - List routingKeys = new ArrayList(); - routingKeys.add(gatewayId); - routingKeys.add(gatewayId + "." + experimentId); - routingKeys.add(gatewayId + "." + experimentId+ ".*"); - routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); - return props; - } - - @Override - public void onMessage(MessageContext msgCtx) { - StatusHandler statusHandler = new StatusHandler(msgCtx); - executor.execute(statusHandler); - } + public List getRoutingKeys() { + String gatewayId = "*"; + String experimentId = "*"; + List routingKeys = new ArrayList(); + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + return routingKeys; + } + private int getThreadPoolSize() { + return ServerSettings.getEnactmentThreadPoolSize(); } private class StatusHandler implements Runnable{ @@ -169,7 +159,7 @@ public class WorkflowEnactmentService { public void run() { super.run(); try { - statusConsumer.stopListen(consumerId); + statusSubscriber.stopListen(consumerId); log.info("Successfully un-binded task status consumer"); } catch (AiravataException e) { log.error("Error while un-bind enactment status consumer", e); http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java index b42e7ac..ecfdeea 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java @@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.model.ComponentState; import org.apache.airavata.model.ComponentStatus; import org.apache.airavata.model.application.io.OutputDataObjectType; @@ -32,14 +31,27 @@ import org.apache.airavata.model.messaging.event.ProcessIdentifier; import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.WorkflowCatalog; +import org.apache.airavata.registry.cpi.WorkflowCatalogException; import org.apache.airavata.workflow.core.dag.edge.Edge; -import org.apache.airavata.workflow.core.dag.nodes.*; +import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; +import org.apache.airavata.workflow.core.dag.nodes.InputNode; +import org.apache.airavata.workflow.core.dag.nodes.OutputNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; import org.apache.airavata.workflow.core.parser.WorkflowParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -64,7 +76,6 @@ class WorkflowInterpreter { private Registry registry; private List completeWorkflowOutputs = new ArrayList<>(); private RabbitMQProcessLaunchPublisher publisher; - private RabbitMQStatusConsumer statusConsumer; private String consumerId; private boolean continueWorkflow = true;