airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [2/5] airavata git commit: orchestrator cpi and gfac cpi changes to adapt with process
Date Thu, 25 Jun 2015 17:32:04 GMT
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 8583a67..a00734a 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.server;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.AiravataStartupException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
@@ -38,14 +37,10 @@ import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
-import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
-import org.apache.airavata.model.status.ExperimentState;
-import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
@@ -70,7 +65,7 @@ import java.util.concurrent.Executors;
 
 public class GfacServerHandler implements GfacService.Iface {
     private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
-    private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+    private RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer;
     private static int requestCount=0;
     private ExperimentCatalog experimentCatalog;
     private AppCatalog appCatalog;
@@ -88,15 +83,15 @@ public class GfacServerHandler implements GfacService.Iface {
             initZkDataStructure();
             initAMQPClient();
 	        executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
-            startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
+            startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQProcessLaunchConsumer);
         } catch (Exception e) {
             throw new AiravataStartupException("Gfac Server Initialization error ", e);
         }
     }
 
     private void initAMQPClient() throws AiravataException {
-        rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
-        rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+        rabbitMQProcessLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+        rabbitMQProcessLaunchConsumer.listen(new TaskLaunchMessageHandler());
     }
 
     private void startCuratorClient() throws ApplicationSettingsException {
@@ -144,19 +139,18 @@ public class GfacServerHandler implements GfacService.Iface {
      * *
      * *
      *
-     * @param experimentId - ExperimentModel id in registry
      * @param processId - processModel id in registry
      * @param gatewayId - gateway Identification
      */
-    public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws
+    public boolean submitProcess(String processId, String gatewayId, String tokenId) throws
             TException {
         requestCount++;
         log.info("-----------------------------------" + requestCount + "-----------------------------------------");
-        log.info(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId,
+        log.info(processId, "GFac Received submit job request for the Process: {} process: {}", processId,
                 processId);
 
         try {
-	        executorService.execute(new GFacWorker(experimentId, processId, gatewayId, tokenId));
+	        executorService.execute(new GFacWorker(processId, gatewayId, tokenId));
         } catch (GFacException e) {
             log.error("Failed to submit process", e);
             return false;
@@ -166,29 +160,14 @@ public class GfacServerHandler implements GfacService.Iface {
 	    return true;
     }
 
-    public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
-    /*    log.info(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
-        try {
-            if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) {
-                log.debug(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
-                return true;
-            } else {
-                log.error(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId);
-                return false;
-            }
-        } catch (Exception e) {
-            log.error(experimentId, "Error cancelling the experiment {}.", experimentId);
-            throw new TException("Error cancelling the experiment : " + e.getMessage(), e);
-        }*/
-	    return false;
+    @Override
+    public boolean cancelProcess(String processId, String gatewayId, String tokenId) throws TException {
+        return false;
     }
 
-
-
-
     public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher,
 
-                                           RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+                                           RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer) {
        /* try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
             Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
@@ -227,51 +206,51 @@ public class GfacServerHandler implements GfacService.Iface {
         public void onMessage(MessageContext message) {
             System.out.println(" Message Received with message id '" + message.getMessageId()
                     + "' and with message type '" + message.getType());
-            if (message.getType().equals(MessageType.LAUNCHTASK)) {
+            if (message.getType().equals(MessageType.LAUNCHPROCESS)) {
                 try {
-                    TaskSubmitEvent event = new TaskSubmitEvent();
+                    ProcessSubmitEvent event = new ProcessSubmitEvent();
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    // update experiment status to executing
-                    ExperimentStatus status = new ExperimentStatus();
-                    status.setState(ExperimentState.EXECUTING);
+                    // update process status to executing
+                    ProcessStatus status = new ProcessStatus();
+                    status.setState(ProcessState.EXECUTING);
                     status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-                    Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
+                    Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
                     try {
-	                    GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getExperimentId(), message.getDeliveryTag(),
+	                    GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getProcessId(), message.getDeliveryTag(),
 			                    event.getTokenId());
-                        submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+                        submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                        rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
                     }
                 } catch (TException e) {
                     log.error(e.getMessage(), e); //nobody is listening so nothing to throw
                 } catch (RegistryException e) {
                     log.error("Error while updating experiment status", e);
                 }
-            } else if (message.getType().equals(MessageType.TERMINATETASK)) {
-                TaskTerminateEvent event = new TaskTerminateEvent();
+            } else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
+                ProcessTerminateEvent event = new ProcessTerminateEvent();
                 TBase messageEvent = message.getEvent();
                 try {
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-	                boolean success = GFacUtils.setExperimentCancelRequest(event.getExperimentId(), curatorClient,
+	                boolean success = GFacUtils.setExperimentCancelRequest(event.getProcessId(), curatorClient,
 			                message.getDeliveryTag());
 	                if (success) {
-		                log.info("expId:{} - Experiment cancel request save successfully", event.getExperimentId());
+		                log.info("processId:{} - Process cancel request save successfully", event.getProcessId());
 	                }
                 } catch (Exception e) {
-	                log.error("expId:" + event.getExperimentId() + " - Experiment cancel reqeust failed", e);
+	                log.error("processId:" + event.getProcessId() + " - Process cancel reqeust failed", e);
                 }finally {
 	                try {
-		                if (!rabbitMQTaskLaunchConsumer.isOpen()) {
-			                rabbitMQTaskLaunchConsumer.reconnect();
+		                if (!rabbitMQProcessLaunchConsumer.isOpen()) {
+			                rabbitMQProcessLaunchConsumer.reconnect();
 		                }
-		                rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+		                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
 	                } catch (AiravataException e) {
-		                log.error("expId: " + event.getExperimentId() + " - Failed to send acknowledgement back to cancel request.", e);
+		                log.error("processId: " + event.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e);
 	                }
                 }
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
deleted file mode 100644
index 39b3df9..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownListener;
-import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class RabbitMQProcessConsumer {
-
-    private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessConsumer.class);
-
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private int prefetchCount;
-
-    public RabbitMQProcessConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for default");
-
-            channel = connection.createChannel();
-            channel.basicQos(prefetchCount);
-//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange default";
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.basicQos(prefetchCount);
-//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-                channel.queueDeclare(queueName, true, false, false, null);
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-            // autoAck=false, we will ack after task is done
-            final String finalQueueName = queueName;
-            channel.basicConsume(queueName, true, new QueueingConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-                        ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
-                        ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
-                        log.debug("Message received with message id : " + message.getMessageId()
-                                + " with task id : " + processSubmitEvent.getTaskId());
-                        event = processSubmitEvent;
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), null);
-                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-                        handler.onMessage(messageContext);
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + finalQueueName;
-                        log.warn(msg, e);
-                    }
-                }
-            });
-            return "";
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange default";
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String queueName , final String exchangeName) throws AiravataException {
-        try {
-            channel.queueUnbind(queueName, exchangeName, null);
-        } catch (IOException e) {
-            String msg = "could not un-bind queue: " + queueName + " for exchange " + exchangeName;
-            log.debug(msg);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
new file mode 100644
index 0000000..850128e
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQProcessLaunchConsumer {
+    private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
+    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
+
+    private String taskLaunchExchangeName;
+    private String url;
+    private Connection connection;
+    private Channel channel;
+    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+    private boolean durableQueue;
+    private MessageHandler messageHandler;
+    private int prefetchCount;
+
+
+    public RabbitMQProcessLaunchConsumer() throws AiravataException {
+        try {
+            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
+            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+            createConnection();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+    }
+
+    public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+        this.taskLaunchExchangeName = exchangeName;
+        this.url = brokerUrl;
+
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            connectionFactory.setAutomaticRecoveryEnabled(true);
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+
+            channel = connection.createChannel();
+            channel.basicQos(prefetchCount);
+
+//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void reconnect() throws AiravataException{
+        if(messageHandler!=null) {
+            try {
+                listen(messageHandler);
+            } catch (AiravataException e) {
+                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+                log.error(msg);
+                throw new AiravataException(msg, e);
+
+            }
+        }
+    }
+    public String listen(final MessageHandler handler) throws AiravataException {
+        try {
+            messageHandler = handler;
+            Map<String, Object> props = handler.getProperties();
+            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+            if (routing == null) {
+                throw new IllegalArgumentException("The routing key must be present");
+            }
+            List<String> keys = new ArrayList<String>();
+            if (routing instanceof List) {
+                for (Object o : (List)routing) {
+                    keys.add(o.toString());
+                }
+            } else if (routing instanceof String) {
+                keys.add((String) routing);
+            }
+
+            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+            if (queueName == null) {
+                if (!channel.isOpen()) {
+                    channel = connection.createChannel();
+                    channel.basicQos(prefetchCount);
+//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+                }
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+
+                channel.queueDeclare(queueName, durableQueue, false, false, null);
+            }
+
+            final String id = getId(keys, queueName);
+            if (queueDetailsMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+                        "cannot define the same subscriber twice");
+            }
+
+            if (consumerTag == null) {
+                consumerTag = "default";
+            }
+
+            // bind all the routing keys
+//            for (String routingKey : keys) {
+//                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+//            }
+            // autoAck=false, we will ack after task is done
+            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag,
+                                           Envelope envelope,
+                                           AMQP.BasicProperties properties,
+                                           byte[] body) {
+                    Message message = new Message();
+
+                    try {
+                        ThriftUtils.createThriftFromBytes(body, message);
+                        TBase event = null;
+                        String gatewayId = null;
+                        long deliveryTag = envelope.getDeliveryTag();
+                        if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+                            ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                                    processSubmitEvent.getProcessId());
+                            event = processSubmitEvent;
+                            gatewayId = processSubmitEvent.getGatewayId();
+                        }else if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+                            ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), processTerminateEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  for processId: " +
+                                    processTerminateEvent.getProcessId());
+                            event = processTerminateEvent;
+                            gatewayId = processTerminateEvent.getGatewayId();
+                        }
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
+                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                        handler.onMessage(messageContext);
+                    } catch (TException e) {
+                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+                        log.warn(msg, e);
+                    }
+                }
+
+                @Override
+                public void handleCancel(String consumerTag) throws IOException {
+                    super.handleCancel(consumerTag);
+                    log.info("Consumer cancelled : " + consumerTag);
+                }
+            });
+
+            // save the name for deleting the queue
+            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+            return id;
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void stopListen(final String id) throws AiravataException {
+        QueueDetails details = queueDetailsMap.get(id);
+        if (details != null) {
+            try {
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+                }
+            } catch (IOException e) {
+                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+                log.debug(msg);
+            }
+        }
+    }
+
+    /**
+     * Private class for holding some information about the consumers registered
+     */
+    private class QueueDetails {
+        String queueName;
+
+        List<String> routingKeys;
+
+        private QueueDetails(String queueName, List<String> routingKeys) {
+            this.queueName = queueName;
+            this.routingKeys = routingKeys;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        public List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
+        }
+    }
+    public boolean isOpen(){
+        if(connection!=null){
+            return connection.isOpen();
+        }
+        return false;
+    }
+
+    public void sendAck(long deliveryTag){
+        try {
+            channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
new file mode 100644
index 0000000..e488f26
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQProcessLaunchPublisher implements Publisher{
+    private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class);
+    private  String launchTask;
+    
+    private RabbitMQProducer rabbitMQProducer;
+
+    public RabbitMQProcessLaunchPublisher() throws Exception {
+        String brokerUrl;
+        try {
+            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            launchTask = ServerSettings.getLaunchQueueName();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
+        rabbitMQProducer.open();
+    }
+
+    public void publish(MessageContext msgCtx) throws AiravataException {
+        try {
+            log.info("Publishing to launch queue ...");
+            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+            Message message = new Message();
+            message.setEvent(body);
+            message.setMessageId(msgCtx.getMessageId());
+            message.setMessageType(msgCtx.getType());
+            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+            String routingKey = launchTask;
+            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+            rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
+            log.info("Successfully published to launch queue ...");
+        } catch (TException e) {
+            String msg = "Error while deserializing the object";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        } catch (Exception e) {
+            String msg = "Error while sending to rabbitmq";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
deleted file mode 100644
index 3684198..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class RabbitMQProcessPublisher implements Publisher {
-
-    private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessPublisher.class);
-    public static final String PROCESS = "process.queue" ;
-
-    private RabbitMQProducer rabbitMQProducer;
-
-    public RabbitMQProcessPublisher() throws Exception {
-        String brokerUrl;
-        String exchangeName;
-        try {
-            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-//            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null, null);
-        rabbitMQProducer.open();
-    }
-
-    @Override
-    public void publish(MessageContext msgCtx) throws AiravataException {
-        try {
-            log.info("Publishing to process queue ...");
-            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
-            Message message = new Message();
-            message.setEvent(body);
-            message.setMessageId(msgCtx.getMessageId());
-            message.setMessageType(msgCtx.getType());
-            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
-            String queueName = PROCESS;
-            message.setMessageType(MessageType.TASK);
-            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
-            rabbitMQProducer.sendToWorkerQueue(messageBody, queueName);
-        } catch (TException e) {
-            String msg = "Error while serializing the thrift object";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        } catch (Exception e) {
-            String msg = "Error while sending to rabbitmq";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 4510a07..83725e2 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -177,7 +177,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                                     taskStatusChangeEvent.getState());
                             event = taskStatusChangeEvent;
                             gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
-                        }else if (message.getMessageType() == MessageType.TASKOUTPUT) {
+                        }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
                             TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
@@ -191,7 +191,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                                     jobStatusChangeEvent.getState());
                             event = jobStatusChangeEvent;
                             gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+                        } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
                             TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
@@ -199,7 +199,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                                     taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
                             event = taskSubmitEvent;
                             gatewayId = taskSubmitEvent.getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.TERMINATETASK)) {
+                        } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
                             TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index cebbed4..fc83199 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -74,7 +74,7 @@ public class RabbitMQStatusPublisher implements Publisher {
                 TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
                 routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
                         event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
-            } else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+            } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
                 TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
                 routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
                         event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
deleted file mode 100644
index 89a4a5d..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQTaskLaunchConsumer {
-    private final static Logger logger = LoggerFactory.getLogger(RabbitMQTaskLaunchConsumer.class);
-    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
-    private String taskLaunchExchangeName;
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-    private boolean durableQueue;
-    private MessageHandler messageHandler;
-    private int prefetchCount;
-
-
-    public RabbitMQTaskLaunchConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
-            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
-            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    public RabbitMQTaskLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
-        this.taskLaunchExchangeName = exchangeName;
-        this.url = brokerUrl;
-
-        createConnection();
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
-
-            channel = connection.createChannel();
-            channel.basicQos(prefetchCount);
-
-//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void reconnect() throws AiravataException{
-        if(messageHandler!=null) {
-            try {
-                listen(messageHandler);
-            } catch (AiravataException e) {
-                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-                log.error(msg);
-                throw new AiravataException(msg, e);
-
-            }
-        }
-    }
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            messageHandler = handler;
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-            List<String> keys = new ArrayList<String>();
-            if (routing instanceof List) {
-                for (Object o : (List)routing) {
-                    keys.add(o.toString());
-                }
-            } else if (routing instanceof String) {
-                keys.add((String) routing);
-            }
-
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.basicQos(prefetchCount);
-//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-
-                channel.queueDeclare(queueName, durableQueue, false, false, null);
-            }
-
-            final String id = getId(keys, queueName);
-            if (queueDetailsMap.containsKey(id)) {
-                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
-                        "cannot define the same subscriber twice");
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-
-            // bind all the routing keys
-//            for (String routingKey : keys) {
-//                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
-//            }
-            // autoAck=false, we will ack after task is done
-            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-                        long deliveryTag = envelope.getDeliveryTag(); //todo store this in zookeeper, once job is done we can ack
-                        if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
-                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
-                            event = taskSubmitEvent;
-                            gatewayId = taskSubmitEvent.getGatewayId();
-                        }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
-                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
-                            event = taskTerminateEvent;
-                            gatewayId = taskTerminateEvent.getGatewayId();
-                        }
-                        System.out.println("*deliveryTag:"+deliveryTag);
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
-                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-                        handler.onMessage(messageContext);
-                        /*try {
-                            channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
-                        } catch (IOException e) {
-                            logger.error(e.getMessage(), e);
-                        }*/
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
-                        log.warn(msg, e);
-                    }
-                }
-
-                @Override
-                public void handleCancel(String consumerTag) throws IOException {
-                    super.handleCancel(consumerTag);
-                    log.info("Consumer cancelled : " + consumerTag);
-                }
-            });
-
-            // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
-            return id;
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String id) throws AiravataException {
-        QueueDetails details = queueDetailsMap.get(id);
-        if (details != null) {
-            try {
-                for (String key : details.getRoutingKeys()) {
-                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
-                }
-            } catch (IOException e) {
-                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
-                log.debug(msg);
-            }
-        }
-    }
-
-    /**
-     * Private class for holding some information about the consumers registered
-     */
-    private class QueueDetails {
-        String queueName;
-
-        List<String> routingKeys;
-
-        private QueueDetails(String queueName, List<String> routingKeys) {
-            this.queueName = queueName;
-            this.routingKeys = routingKeys;
-        }
-
-        public String getQueueName() {
-            return queueName;
-        }
-
-        public List<String> getRoutingKeys() {
-            return routingKeys;
-        }
-    }
-
-    private String getId(List<String> routingKeys, String queueName) {
-        String id = "";
-        for (String key : routingKeys) {
-            id = id + "_" + key;
-        }
-        return id + "_" + queueName;
-    }
-
-    public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException ignore) {
-            }
-        }
-    }
-    public boolean isOpen(){
-        if(connection!=null){
-            return connection.isOpen();
-        }
-        return false;
-    }
-
-    public void sendAck(long deliveryTag){
-        try {
-            channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
deleted file mode 100644
index 53bf373..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQTaskLaunchPublisher implements Publisher{
-    private final static Logger log = LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class);
-    private  String launchTask;
-    
-    private RabbitMQProducer rabbitMQProducer;
-
-    public RabbitMQTaskLaunchPublisher() throws Exception {
-        String brokerUrl;
-        try {
-            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            launchTask = ServerSettings.getLaunchQueueName();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
-        rabbitMQProducer.open();
-    }
-
-    public void publish(MessageContext msgCtx) throws AiravataException {
-        try {
-            log.info("Publishing to launch queue ...");
-            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
-            Message message = new Message();
-            message.setEvent(body);
-            message.setMessageId(msgCtx.getMessageId());
-            message.setMessageType(msgCtx.getType());
-            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
-            String routingKey = launchTask;
-            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
-            rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
-            log.info("Successfully published to launch queue ...");
-        } catch (TException e) {
-            String msg = "Error while deserializing the object";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        } catch (Exception e) {
-            String msg = "Error while sending to rabbitmq";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
deleted file mode 100644
index e506556..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.core.impl;
-
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the simplest implementation for JobSubmitter,
- * This is calling gfac invocation methods to invoke the gfac embedded mode,so this does not really implement
- * the selectGFACInstance method
- */
-public class GFACEmbeddedJobSubmitter implements JobSubmitter {
-    private final static Logger logger = LoggerFactory.getLogger(GFACEmbeddedJobSubmitter.class);
-
-    private OrchestratorContext orchestratorContext;
-
-    private GFac gfac;
-
-
-    public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
-        this.orchestratorContext = orchestratorContext;
-    }
-
-    public GFACInstance selectGFACInstance() throws OrchestratorException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-
-    public boolean submit(String experimentID, String taskID,String tokenId) throws OrchestratorException {
-        try {
-        	 String gatewayId = null;
-        	 CredentialReader credentialReader = GFacUtils.getCredentialReader();
-             if (credentialReader != null) {
-                 try {
-                	 gatewayId = credentialReader.getGatewayID(tokenId);
-                 } catch (Exception e) {
-                     logger.error(e.getLocalizedMessage());
-                 }
-             }
-            if(gatewayId == null || gatewayId.isEmpty()){
-             gatewayId = ServerSettings.getDefaultUserGateway();
-            }
-          return gfac.submitJob(experimentID, taskID, gatewayId, tokenId);
-        } catch (Exception e) {
-            String error = "Error launching the job : " + experimentID;
-            logger.error(error);
-            throw new OrchestratorException(error);
-        }
-    }
-
-    public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException {
-        return false;
-    }
-
-    public GFac getGfac() {
-        return gfac;
-    }
-
-    public void setGfac(GFac gfac) {
-        this.gfac = gfac;
-    }
-
-    public OrchestratorContext getOrchestratorContext() {
-        return orchestratorContext;
-    }
-
-    public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
-        this.orchestratorContext = orchestratorContext;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 21df198..f600b72 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -29,9 +29,7 @@ import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
-import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
@@ -47,14 +45,11 @@ import java.util.UUID;
  */
 public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
     private final static Logger logger = LoggerFactory.getLogger(GFACPassiveJobSubmitter.class);
-    public static final String IP = "ip";
-    private OrchestratorContext orchestratorContext;
     private static Integer mutex = -1;
     private Publisher publisher;
 
     public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
-        this.orchestratorContext = orchestratorContext;
-        if(orchestratorContext.getPublisher()!=null){ // use the same publisher this will be empty if rabbitmq.publish is not enabled in the configuraiton
+        if(orchestratorContext.getPublisher()!=null){
             this.publisher = orchestratorContext.getPublisher();
         }else {
             try {
@@ -72,20 +67,16 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
         return null;
     }
 
-    public boolean submit(String experimentID, String taskID) throws OrchestratorException {
-        return submit(experimentID, taskID, null);
-    }
-
     /**
      * Submit the job to a shared launch.queue accross multiple gfac instances
      *
-     * @param experimentID
-     * @param taskID
+     * @param experimentId
+     * @param processId
      * @param tokenId
      * @return
      * @throws OrchestratorException
      */
-    public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException {
+    public boolean submit(String experimentId, String processId, String tokenId) throws OrchestratorException {
         try {
             String gatewayId = null;
             CredentialReader credentialReader = GFacUtils.getCredentialReader();
@@ -99,8 +90,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             if (gatewayId == null || gatewayId.isEmpty()) {
                 gatewayId = ServerSettings.getDefaultUserGateway();
             }
-            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId, tokenId);
-            MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+            ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(processId, gatewayId, tokenId);
+            MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.LAUNCHPROCESS, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
             messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
             publisher.publish(messageContext);
         } catch (Exception e) {
@@ -113,12 +104,12 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
 
     /**
      * Submit the experiment the terminate.queue job queue and remove the experiment from shared launch.queue
-     * @param experimentID
-     * @param taskID
+     * @param experimentId
+     * @param processId
      * @return
      * @throws OrchestratorException
      */
-    public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException {
+    public boolean terminate(String experimentId, String processId, String tokenId) throws OrchestratorException {
         String gatewayId = null;
         try {
             CredentialReader credentialReader = GFacUtils.getCredentialReader();
@@ -134,8 +125,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 gatewayId = ServerSettings.getDefaultUserGateway();
 
             }
-            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId);
-            MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
+            ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent(processId, gatewayId, tokenId);
+            MessageContext messageContext = new MessageContext(processTerminateEvent, MessageType.TERMINATEPROCESS, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
             messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
             publisher.publish(messageContext);
             return true;

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index b885da5..12c35c8 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -45,20 +45,20 @@ public interface JobSubmitter {
 
     /**
      * This is similar to submit with expId and taskId but this has extra param called token
-     * @param experimentID
-     * @param taskID
+     * @param experimentId
+     * @param processId
      * @param tokenId
      * @return
      * @throws OrchestratorException
      */
-    boolean submit(String experimentID,String taskID,String tokenId) throws OrchestratorException;
+    boolean submit(String experimentId,String processId,String tokenId) throws OrchestratorException;
 
     /**
      * This can be used to terminate the experiment
-     * @param experimentID
-     * @param taskID
+     * @param experimentId
+     * @param processId
      * @return
      * @throws OrchestratorException
      */
-    boolean terminate(String experimentID,String taskID, String tokenId)throws OrchestratorException;
+    boolean terminate(String experimentId,String processId, String tokenId)throws OrchestratorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 589b707..c0d63d6 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,12 +29,8 @@ import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -43,7 +39,6 @@ import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentType;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.status.ExperimentState;
 import org.apache.airavata.model.status.ExperimentStatus;
@@ -56,12 +51,14 @@ import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
 import org.apache.airavata.registry.cpi.*;
-import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -72,8 +69,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private String airavataUserName;
 	private String gatewayName;
 	private Publisher publisher;
-    private RabbitMQProcessConsumer rabbitMQProcessConsumer;
-    private RabbitMQProcessPublisher rabbitMQProcessPublisher;
 
     /**
 	 * Query orchestrator server to fetch the CPI version
@@ -99,7 +94,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             appCatalog = RegistryFactory.getAppCatalog();
 			orchestrator.initialize();
 			orchestrator.getOrchestratorContext().setPublisher(this.publisher);
-            startProcessConsumer();
         } catch (OrchestratorException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -112,18 +106,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
         }
     }
 
-    private void startProcessConsumer() throws OrchestratorException {
-        try {
-            rabbitMQProcessConsumer = new RabbitMQProcessConsumer();
-            ProcessConsumer processConsumer = new ProcessConsumer();
-            Thread thread = new Thread(processConsumer);
-            thread.start();
-        } catch (AiravataException e) {
-            throw new OrchestratorException("Error while starting process consumer", e);
-        }
-
-    }
-
     /**
 	 * * After creating the experiment Data user have the * experimentID as the
 	 * handler to the experiment, during the launchExperiment * We just have to
@@ -133,10 +115,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	 * @param experimentId
 	 */
 	public boolean launchExperiment(String experimentId, String token) throws TException {
-        ExperimentModel experiment = null; // this will inside the bottom catch statement
+        ExperimentModel experiment = null;
         try {
-            experiment = (ExperimentModel) experimentCatalog.get(
-                    ExperimentCatalogModelType.EXPERIMENT, experimentId);
+            experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
             if (experiment == null) {
                 log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
                 return false;
@@ -430,14 +411,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 //        }
     }
 
-    public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception {
-        if (rabbitMQProcessPublisher == null) {
-            rabbitMQProcessPublisher = new RabbitMQProcessPublisher();
-        }
-        return rabbitMQProcessPublisher;
-    }
-
-
     private class SingleAppExperimentRunner implements Runnable {
 
         String experimentId;
@@ -497,39 +470,4 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             return true;
         }
     }
-
-    private class ProcessConsumer implements Runnable, MessageHandler{
-
-
-        @Override
-        public void run() {
-            try {
-                rabbitMQProcessConsumer.listen(this);
-            } catch (AiravataException e) {
-                log.error("Error while listen to the RabbitMQProcessConsumer");
-            }
-        }
-
-        @Override
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS);
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS);
-            return props;
-        }
-
-        @Override
-        public void onMessage(MessageContext msgCtx) {
-            TBase event = msgCtx.getEvent();
-            if (event instanceof ProcessSubmitEvent) {
-                ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event;
-                try {
-                    launchProcess(processSubmitEvent.getProcessId(), processSubmitEvent.getCredentialToken());
-                } catch (TException e) {
-                    log.error("Error while launching task : " + processSubmitEvent.getProcessId());
-                }
-            }
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
deleted file mode 100644
index f9f8115..0000000
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.orchestrator.util;
-
-import java.util.List;
-
-import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.ApplicationInterface;
-import org.apache.airavata.model.util.ExecutionType;
-import org.apache.airavata.model.experiment.ExperimentModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DataModelUtils {
-
-    private final static Logger logger = LoggerFactory.getLogger(DataModelUtils.class);
-	public static ExecutionType getExecutionType(String gatewayId, ExperimentModel experiment){
-		try {
-			ApplicationInterface applicationInterface = RegistryFactory.getAppCatalog().getApplicationInterface();
-			List<String> allApplicationInterfaceIds = applicationInterface.getAllApplicationInterfaceIds();
-			String applicationId = experiment.getExecutionId();
-			if (allApplicationInterfaceIds.contains(applicationId)){
-				return ExecutionType.SINGLE_APP;
-			} else {
-				List<String> allWorkflows = RegistryFactory.getAppCatalog().getWorkflowCatalog().getAllWorkflows(gatewayId);
-				if (allWorkflows.contains(applicationId)){
-					return ExecutionType.WORKFLOW;
-				}
-			}
-		} catch (AppCatalogException e) {
-			logger.error(e.getMessage(), e);
-		}
-        return ExecutionType.UNKNOWN;
-	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/thrift-interface-descriptions/airavata-api/messaging_events.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/messaging_events.thrift b/thrift-interface-descriptions/airavata-api/messaging_events.thrift
index a863edc..1d2a537 100644
--- a/thrift-interface-descriptions/airavata-api/messaging_events.thrift
+++ b/thrift-interface-descriptions/airavata-api/messaging_events.thrift
@@ -40,9 +40,9 @@ enum MessageType {
     TASK,
     PROCESS,
     JOB,
-    LAUNCHTASK,
-    TERMINATETASK,
-    TASKOUTPUT
+    LAUNCHPROCESS,
+    TERMINATEPROCESS,
+    PROCESSOUTPUT
 }
 
 struct ExperimentStatusChangeEvent {
@@ -110,22 +110,15 @@ struct JobIdentifier {
 // }
 
 struct ProcessSubmitEvent{
-    1: required string processId;
-    2: required string credentialToken;
-}
-
-struct TaskSubmitEvent{
-    1: required string experimentId,
-    2: required string taskId,
-    3: required string gatewayId,
-    4: required string tokenId
+    1: required string processId,
+    2: required string gatewayId,
+    3: required string tokenId
 }
 
-struct TaskTerminateEvent{
-    1: required string experimentId,
-    2: required string taskId,
-    3: required string gatewayId,
-    4: required string tokenId
+struct ProcessTerminateEvent{
+    1: required string processId,
+    2: required string gatewayId,
+    3: required string tokenId
 }
 
 struct JobStatusChangeEvent {


Mime
View raw message