airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [01/12] airavata git commit: Fixing rabbitmq reconnect issue
Date Wed, 08 Apr 2015 02:32:00 GMT
Repository: airavata
Updated Branches:
  refs/heads/emailMonitoring 9f1fa2d35 -> 95fae20b1


Fixing rabbitmq reconnect issue


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a51a7ca2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a51a7ca2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a51a7ca2

Branch: refs/heads/emailMonitoring
Commit: a51a7ca2265bcc2e56de8868bd0291d4437e941a
Parents: 764a239
Author: Lahiru Gunathilake <glahiru@gmail.com>
Authored: Thu Apr 2 11:02:44 2015 -0400
Committer: Lahiru Gunathilake <glahiru@gmail.com>
Committed: Thu Apr 2 11:02:44 2015 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        | 16 +++++++++--
 .../client/samples/CreateLaunchExperiment.java  |  2 +-
 .../airavata/common/utils/ServerSettings.java   | 13 ++++++++-
 .../main/resources/airavata-server.properties   |  3 +-
 .../test/resources/airavata-server.properties   | 30 ++++++++++++--------
 .../airavata/gfac/server/GfacServerHandler.java |  9 +++---
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  6 ++--
 .../core/impl/RabbitMQTaskLaunchConsumer.java   | 21 ++++++++++++++
 .../core/impl/RabbitMQTaskLaunchPublisher.java  | 14 ++++-----
 .../cpi/impl/SimpleOrchestratorImpl.java        |  1 -
 10 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index aabba55..e7503e6 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -27,6 +27,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.*;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
@@ -128,9 +129,20 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 		}
     }
 
-    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode,
String experimentPath) throws ApplicationSettingsException, KeeperException, InterruptedException,
AiravataException {
+    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode,
String experimentPath) throws KeeperException, InterruptedException, AiravataException {
+        int count =0;
+        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
         if (ServerSettings.isGFacPassiveMode()) {
-            consumer.sendAck(AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
+            while(!consumer.isOpen() && count<3){
+                try {
+                    consumer.reconnect();
+                } catch (AiravataException e) {
+                    count++;
+                }
+            }
+            if(consumer.isOpen()){
+                consumer.sendAck(deliveryTag);
+            }
         }
         ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX);
         ZKUtil.deleteRecursive(zk, experimentPath);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 0652fbc..d173a0b 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -58,7 +58,7 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "php_reference_gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_11afd5ec-e04e-45c9-843b-0c5b28a617f8";
+    private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60";
     private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926";

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 1c338d4..39da292 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -59,9 +59,11 @@ public class ServerSettings extends ApplicationSettings {
     public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
     public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
     public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled
+    public static final String LAUNCH_QUEUE_NAME = "launch.queue.name";
+    public static final String CANCEL_QUEUE_NAME = "cancel.queue.name";
 
 
-//    Workflow Enactment Service component configuration.
+    //    Workflow Enactment Service component configuration.
     private static final String ENACTMENT_THREAD_POOL_SIZE = "enactment.thread.pool.size";
     private static final int DEFAULT_ENACTMENT_THREAD_POOL_SIZE = 10;
     private static final String WORKFLOW_PARSER = "workflow.parser";
@@ -73,6 +75,15 @@ public class ServerSettings extends ApplicationSettings {
         return getSetting(DEFAULT_USER);
     }
 
+    public static String getLaunchQueueName() {
+        return getSetting(LAUNCH_QUEUE_NAME, "launch.queue");
+    }
+
+
+    public static String getCancelQueueName() {
+        return getSetting(CANCEL_QUEUE_NAME, "cancel.queue");
+    }
+
     public static String getDefaultUserPassword() throws ApplicationSettingsException {
         return getSetting(DEFAULT_USER_PASSWORD);
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 7410d7f..0d6f4e4 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -227,7 +227,8 @@ task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunch
 rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
 rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
 durable.queue=false
-
+launch.queue.name=launch.queue
+cancel.queue.name=cancel.queue
 activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
 rabbitmq.exchange.name=airavata_rabbitmq_exchange
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/credential-store/credential-store-service/src/test/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store/credential-store-service/src/test/resources/airavata-server.properties
b/modules/credential-store/credential-store-service/src/test/resources/airavata-server.properties
index b466d8b..f6c98db 100644
--- a/modules/credential-store/credential-store-service/src/test/resources/airavata-server.properties
+++ b/modules/credential-store/credential-store-service/src/test/resources/airavata-server.properties
@@ -31,10 +31,10 @@
 
 #for derby [AiravataJPARegistry]
 registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
+registry.jdbc.url=jdbc:derby://localhost:1527/experiment_catalog;create=true;user=airavata;password=airavata
 # MySql database configuration
 #registry.jdbc.driver=com.mysql.jdbc.Driver
-#registry.jdbc.url=jdbc:mysql://localhost:3306/persistent_data
+#registry.jdbc.url=jdbc:mysql://localhost:3306/experiment_catalog
 registry.jdbc.user=airavata
 registry.jdbc.password=airavata
 start.derby.server.mode=true
@@ -82,9 +82,9 @@ gfac.server.port=8950
 orchestrator.server.min.threads=50
 
 ###########################################################################
-#  Job Scheduler can send informative email messages to you about the status of your job.
+#  Job Scheduler can send informative email messages to you about the status of your job.
 
 # Specify a string which consists of either the single character "n" (no mail), or one or
more
-#  of the characters "a" (send mail when job is aborted), "b" (send mail when job begins),
+#  of the characters "a" (send mail when job is aborted), "b" (send mail when job begins),

 # and "e" (send mail when job terminates).  The default is "a" if not specified.
 ###########################################################################
 
@@ -100,7 +100,7 @@ start.credential.store=false
 credential.store.keystore.url=/Users/chathuri/dev/airavata/credential-store/oa4mp/airavata_sym.jks
 credential.store.keystore.alias=airavata
 credential.store.keystore.password=airavata
-credential.store.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
+credential.store.jdbc.url=jdbc:derby://localhost:1527/experiment_catalog;create=true;user=airavata;password=airavata
 credential.store.jdbc.user=airavata
 credential.store.jdbc.password=airavata
 credential.store.jdbc.driver=org.apache.derby.jdbc.ClientDriver
@@ -137,7 +137,7 @@ myproxy.password=
 myproxy.life=3600
 # XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
 trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
-gfac.passive=true
+gfac.passive=false
 # SSH PKI key pair or ssh password can be used SSH based authentication is used.
 # if user specify both password authentication gets the higher preference
 
@@ -146,7 +146,7 @@ gfac.passive=true
 #private.ssh.key=/path to private key file for ssh
 #ssh.keypass=passphrase for the private key
 #ssh.username=username for ssh connection
-### Incase of password authentication.
+### Incase of password authentication. 
 #ssh.password=Password for ssh connection
 
 
@@ -216,20 +216,26 @@ connection.name=xsede
 #publisher
 activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
 publish.rabbitmq=false
+
+rabbitmq.broker.url=amqp://localhost:5672
+
 status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
 task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
-rabbitmq.broker.url=amqp://localhost:5672
 rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
 rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
-
+durable.queue=false
+launch.queue.name=launch.queue
+cancel.queue.name=cancel.queue
+activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+rabbitmq.exchange.name=airavata_rabbitmq_exchange
 
 ###########################################################################
 # Orchestrator module Configuration
 ###########################################################################
 
 #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter
 job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
 submitter.interval=10000
 threadpool.size=10
@@ -251,4 +257,4 @@ gfac-server=/gfac-server
 gfac-experiments=/gfac-experiments
 gfac-server-name=gfac-node0
 orchestrator-server-name=orch-node0
-airavata-server-name=api-node0
+airavata-server-name=api-node0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index f2c5075..c8fc3fb 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.server;
 
 import com.google.common.eventbus.EventBus;
+import edu.uiuc.ncsa.security.delegation.services.Server;
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
@@ -306,8 +307,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
     }
 
     private class TaskLaunchMessageHandler implements MessageHandler {
-        public static final String LAUNCH_TASK = "launch.task";
-        public static final String TERMINATE_TASK = "teminate.task";
         private String experimentNode;
         private String nodeName;
 
@@ -319,10 +318,10 @@ public class GfacServerHandler implements GfacService.Iface, Watcher
{
         public Map<String, Object> getProperties() {
             Map<String, Object> props = new HashMap<String, Object>();
             ArrayList<String> keys = new ArrayList<String>();
-            keys.add(LAUNCH_TASK);
-            keys.add(TERMINATE_TASK);
+            keys.add(ServerSettings.getLaunchQueueName());
+            keys.add(ServerSettings.getCancelQueueName());
             props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
-            props.put(MessagingConstants.RABBIT_QUEUE, LAUNCH_TASK);
+            props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
             return props;
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 8d09a09..3fa7237 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -115,10 +115,8 @@ public class BetterGfacImpl implements GFac,Watcher {
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher
publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
         try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
-            Publisher rabbitMQPublisher = null;
-            if (ServerSettings.isRabbitMqPublishEnabled()){
-                rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-            }
+            Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
+
             for (String listenerClass : listenerClassList) {
                 Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
                 AbstractActivityListener abstractActivityListener = aClass.newInstance();

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 52cf7e0..0cd1042 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -52,6 +52,8 @@ public class RabbitMQTaskLaunchConsumer {
     private Channel channel;
     private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
     private boolean durableQueue;
+    private MessageHandler messageHandler;
+
 
     public RabbitMQTaskLaunchConsumer() throws AiravataException {
         try {
@@ -94,8 +96,21 @@ public class RabbitMQTaskLaunchConsumer {
         }
     }
 
+    public void reconnect() throws AiravataException{
+        if(messageHandler!=null) {
+            try {
+                listen(messageHandler);
+            } catch (AiravataException e) {
+                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+                log.error(msg);
+                throw new AiravataException(msg, e);
+
+            }
+        }
+    }
     public String listen(final MessageHandler handler) throws AiravataException {
         try {
+            messageHandler = handler;
             Map<String, Object> props = handler.getProperties();
             final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
             if (routing == null) {
@@ -245,6 +260,12 @@ public class RabbitMQTaskLaunchConsumer {
             }
         }
     }
+    public boolean isOpen(){
+        if(connection!=null){
+            return connection.isOpen();
+        }
+        return false;
+    }
 
     public void sendAck(long deliveryTag){
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index 3d8a377..919087e 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -20,7 +20,6 @@
 */
 package org.apache.airavata.messaging.core.impl;
 
-import com.rabbitmq.client.MessageProperties;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -29,23 +28,24 @@ import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.messaging.event.*;
+import org.apache.catalina.Server;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RabbitMQTaskLaunchPublisher implements Publisher{
     private final static Logger log = LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class);
-    public static final String LAUNCH_TASK = "launch.task";
-    public static final String TERMINATE_TASK = "teminate.task";
+    private  String launchTask;
+    private  String cancelTask;
 
     private RabbitMQProducer rabbitMQProducer;
 
     public RabbitMQTaskLaunchPublisher() throws Exception {
         String brokerUrl;
-        String exchangeName;
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+            launchTask = ServerSettings.getLaunchQueueName();
+            cancelTask = ServerSettings.getCancelQueueName();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to
initialize rabbitmq";
             log.error(message, e);
@@ -66,9 +66,9 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
             message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
             String routingKey = null;
             if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){
-                routingKey = LAUNCH_TASK;
+                routingKey = launchTask;
             }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){
-                routingKey = TERMINATE_TASK;
+                routingKey = cancelTask;
             }
             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
             rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a51a7ca2/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 0a2b5a3..3c7c294 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -76,7 +76,6 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         } catch (Exception e) {
             throw new OrchestratorException("Error launching the job", e);
         }
-
     }
 
     /**


Mime
View raw message