airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [1/3] git commit: adding more changes to messaging and changing xbaya to fit messaging
Date Thu, 16 Oct 2014 15:22:21 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 6e49e375f -> 51f456d8c


adding more changes to messaging and changing xbaya to fit messaging

Signed-off-by: Chathuri Wimalasena <kamalasini@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7e498ac5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7e498ac5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7e498ac5

Branch: refs/heads/master
Commit: 7e498ac54b5e135311b60923fad80247f04470f9
Parents: 9aeed4d
Author: Supun <supun06@gmail.com>
Authored: Thu Oct 16 10:06:00 2014 -0400
Committer: Chathuri Wimalasena <kamalasini@gmail.com>
Committed: Thu Oct 16 10:25:52 2014 -0400

----------------------------------------------------------------------
 .../airavata/messaging/core/MessageHandler.java |   2 +-
 .../messaging/core/MessagingConstants.java      |   3 +
 .../airavata/messaging/core/TestClient.java     |  15 +-
 .../messaging/core/impl/RabbitMQConsumer.java   | 131 +++++++----
 .../messaging/core/impl/RabbitMQProducer.java   |   2 +-
 .../messaging/core/impl/RabbitMQPublisher.java  |   8 +-
 .../airavata/xbaya/messaging/EventData.java     |  44 ++--
 .../xbaya/messaging/EventDataRepository.java    |   8 +-
 .../airavata/xbaya/messaging/MessageClient.java | 220 -------------------
 .../airavata/xbaya/messaging/Monitor.java       | 127 +++++------
 10 files changed, 189 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index 8b897c5..f18715a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -3,7 +3,7 @@ package org.apache.airavata.messaging.core;
 import java.util.Map;
 
 public interface MessageHandler {
-    Map<String, String> getProperties();
+    Map<String, Object> getProperties();
 
     void onMessage(MessageContext message);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index fa0946a..7bfdb09 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -1,6 +1,9 @@
 package org.apache.airavata.messaging.core;
 
 public abstract class MessagingConstants {
+    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+    public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+
     public static final String RABBIT_ROUTING_KEY = "routingKey";
     public static final String RABBIT_QUEUE= "queue";
     public static final String RABBIT_CONSUMER_TAG = "consumerTag";

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 4de9aba..991b85b 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -29,7 +29,9 @@ import org.apache.airavata.model.messaging.event.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -37,19 +39,22 @@ public class TestClient {
     public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
     public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
     private final static Logger logger = LoggerFactory.getLogger(TestClient.class);
-    private final static String experimentId = "echoExperiment_cc733586-2bf8-4ee2-8a25-6521db135e7f";
+    private final static String experimentId = "echoExperiment_cc733586-2bf8-4ee2-8a25-6521db135e7f.*";
 
     public static void main(String[] args) {
         try {
             AiravataUtils.setExecutionAsServer();
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
-            String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+            final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
             RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
             consumer.listen(new MessageHandler() {
                 @Override
-                public Map<String, String> getProperties() {
-                    Map<String, String> props = new HashMap<String, String>();
-                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, experimentId);
+                public Map<String, Object> getProperties() {
+                    Map<String, Object> props = new HashMap<String, Object>();
+                    List<String> routingKeys = new ArrayList<String>();
+                    routingKeys.add(experimentId);
+                    routingKeys.add(experimentId + ".*.*");
+                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
                     return props;
                 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
index f3c1942..22d6317 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
@@ -24,6 +24,8 @@ package org.apache.airavata.messaging.core.impl;
 
 import com.rabbitmq.client.*;
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.Consumer;
 import org.apache.airavata.messaging.core.MessageContext;
@@ -36,7 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class RabbitMQConsumer implements Consumer {
@@ -48,15 +52,40 @@ public class RabbitMQConsumer implements Consumer {
     private Channel channel;
     private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
 
+    public RabbitMQConsumer() throws AiravataException {
+        try {
+            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+
+            createConnection();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to
initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+    }
+
     public RabbitMQConsumer(String brokerUrl, String exchangeName) throws AiravataException
{
         this.exchangeName = exchangeName;
         this.url = brokerUrl;
 
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
         try {
-            connection = createConnection();
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+
             channel = connection.createChannel();
+            channel.exchangeDeclare(exchangeName, "topic", false);
 
-            channel.exchangeDeclare(exchangeName, "direct", false);
         } catch (Exception e) {
             String msg = "could not open channel for exchange " + exchangeName;
             log.error(msg);
@@ -66,24 +95,44 @@ public class RabbitMQConsumer implements Consumer {
 
     public String listen(final MessageHandler handler) throws AiravataException {
         try {
-            Map<String, String> props = handler.getProperties();
-            final String routingKey = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routingKey == null) {
+            Map<String, Object> props = handler.getProperties();
+            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+            if (routing == null) {
                 throw new IllegalArgumentException("The routing key must be present");
             }
 
-            String queueName = props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+            List<String> keys = new ArrayList<String>();
+            if (routing instanceof List) {
+                for (Object o : (List)routing) {
+                    keys.add(o.toString());
+                }
+            } else if (routing instanceof String) {
+                keys.add((String) routing);
+            }
+
+            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
             if (queueName == null) {
                 queueName = channel.queueDeclare().getQueue();
             } else {
                 channel.queueDeclare(queueName, true, false, false, null);
             }
+
+            final String id = getId(keys, queueName);
+            if (queueDetailsMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this
Consumer, " +
+                        "cannot define the same subscriber twice");
+            }
+
             if (consumerTag == null) {
                 consumerTag = "default";
             }
-            String id = routingKey + "." + queueName;
-            channel.queueBind(queueName, exchangeName, routingKey);
+
+            // bind all the routing keys
+            for (String routingKey : keys) {
+                channel.queueBind(queueName, exchangeName, routingKey);
+            }
+
             channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel)
{
                 @Override
                 public void handleDelivery(String consumerTag,
@@ -99,38 +148,42 @@ public class RabbitMQConsumer implements Consumer {
                             ExperimentStatusChangeEvent experimentStatusChangeEvent = new
ExperimentStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + experimentStatusChangeEvent.getState());
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " +
+                                    experimentStatusChangeEvent.getState());
                             event = experimentStatusChangeEvent;
                         } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE))
{
                             WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + wfnStatusChangeEvent.getState());
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " +
+                                    wfnStatusChangeEvent.getState());
                             event = wfnStatusChangeEvent;
                         } else if (message.getMessageType().equals(MessageType.TASK)) {
                             TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + taskStatusChangeEvent.getState());
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " +
+                                    taskStatusChangeEvent.getState());
                             event = taskStatusChangeEvent;
                         } else if (message.getMessageType().equals(MessageType.JOB)) {
                             JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + jobStatusChangeEvent.getState());
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " +
+                                    jobStatusChangeEvent.getState());
                             event = jobStatusChangeEvent;
                         }
 
                         MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId());
                         handler.onMessage(messageContext);
                     } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, exchange:
" + exchangeName + " routingKey: " + routingKey;
+                        String msg = "Failed to de-serialize the thrift message, from routing
keys and queueName " + id;
                         log.warn(msg, e);
                     }
                 }
             });
             // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, routingKey));
+            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
             return id;
         } catch (Exception e) {
             String msg = "could not open channel for exchange " + exchangeName;
@@ -143,7 +196,9 @@ public class RabbitMQConsumer implements Consumer {
         QueueDetails details = queueDetailsMap.get(id);
         if (details != null) {
             try {
-                channel.queueUnbind(details.getQueueName(), exchangeName, details.getRoutingKey());
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
+                }
                 channel.queueDelete(details.getQueueName(), true, true);
             } catch (IOException e) {
                 String msg = "could not un-bind queue: " + details.getQueueName() + " for
exchange " + exchangeName;
@@ -153,42 +208,42 @@ public class RabbitMQConsumer implements Consumer {
         }
     }
 
-    private Connection createConnection() throws IOException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            Connection connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-            return connection;
-        } catch (Exception e) {
-            log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
-            return null;
-        }
-    }
-
     /**
      * Private class for holding some information about the consumers registered
      */
     private class QueueDetails {
         String queueName;
 
-        String routingKey;
+        List<String> routingKeys;
 
-        private QueueDetails(String queueName, String routingKey) {
+        private QueueDetails(String queueName, List<String> routingKeys) {
             this.queueName = queueName;
-            this.routingKey = routingKey;
+            this.routingKeys = routingKeys;
         }
 
         public String getQueueName() {
             return queueName;
         }
 
-        public String getRoutingKey() {
-            return routingKey;
+        public List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index a741a8d..b4a6d46 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -104,7 +104,7 @@ public class RabbitMQProducer {
                 log.info("setting basic.qos / prefetch count to " + prefetchCount + " for
" + exchangeName);
                 channel.basicQos(prefetchCount);
             }
-            channel.exchangeDeclare(exchangeName, "direct", false);
+            channel.exchangeDeclare(exchangeName, "topic", false);
         } catch (Exception e) {
             reset();
             String msg = "could not open channel for exchange " + exchangeName;

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index edbb28d..ff14a8c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.thrift.TException;
@@ -33,8 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RabbitMQPublisher implements Publisher {
-    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
-    public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+
     private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class);
 
     private RabbitMQProducer rabbitMQProducer;
@@ -44,8 +44,8 @@ public class RabbitMQPublisher implements Publisher {
         String brokerUrl;
         String exchangeName;
         try {
-            brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to
initialize rabbitmq";
             log.error(message, e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java
b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java
index 9ef3331..ab02662 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventData.java
@@ -21,7 +21,9 @@
 
 package org.apache.airavata.xbaya.messaging;
 
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
@@ -38,7 +40,7 @@ import java.util.Date;
 
 public class EventData {
 
-    private Message  messageEvent;
+    private MessageContext  messageEvent;
 
     private Date updateDate;
 
@@ -47,7 +49,6 @@ public class EventData {
     private String workflowNodeId;
 
     private String message;
-    private MessageLevel messageLevel;
     private String messageId;
 
     /**
@@ -55,35 +56,31 @@ public class EventData {
      * 
      * @param event
      */
-    public EventData(Message event) throws TException {
+    public EventData(MessageContext event) {
         this.messageEvent = event;
         process(event);
     }
 
-    private void process(Message event) throws TException {
+    private void process(MessageContext event) {
         this.messageId = event.getMessageId();
-        this.messageLevel = event.getMessageLevel();
-        if (event.getMessageType() == MessageType.EXPERIMENT) {
-            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-            ThriftUtils.createThriftFromBytes(event.getEvent(), experimentStatusChangeEvent);
+        if (event.getType() == MessageType.EXPERIMENT) {
+            ExperimentStatusChangeEvent experimentStatusChangeEvent = (ExperimentStatusChangeEvent)
event.getEvent();
             this.status = experimentStatusChangeEvent.getState().toString();
             this.experimentId = experimentStatusChangeEvent.getExperimentId();
             this.workflowNodeId = "";
             this.message = "Received experiment event , expId : " + experimentStatusChangeEvent.getExperimentId()
+
                     ", status : " + experimentStatusChangeEvent.getState().toString();
-        }   else if (event.getMessageType() == MessageType.WORKFLOWNODE) {
-            WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
-            ThriftUtils.createThriftFromBytes(event.getEvent(), wfnStatusChangeEvent);
+        } else if (event.getType() == MessageType.WORKFLOWNODE) {
+            WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = (WorkflowNodeStatusChangeEvent)
event.getEvent();
             WorkflowIdentifier wfIdentifier = wfnStatusChangeEvent.getWorkflowNodeIdentity();
             this.status = wfnStatusChangeEvent.getState().toString();
             this.experimentId = wfIdentifier.getExperimentId();
             this.workflowNodeId = wfIdentifier.getWorkflowNodeId();
-            this.message = "Received workflow status change event, expId : " +  wfIdentifier.getExperimentId()
+
+            this.message = "Received workflow status change event, expId : " + wfIdentifier.getExperimentId()
+
                     ", nodeId : " + wfIdentifier.getWorkflowNodeId() + " , status : " + wfnStatusChangeEvent.getState().toString();
 
-        }else if (event.getMessageType() == MessageType.TASK) {
-            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-            ThriftUtils.createThriftFromBytes(event.getEvent(), taskStatusChangeEvent);
+        } else if (event.getType() == MessageType.TASK) {
+            TaskStatusChangeEvent taskStatusChangeEvent = (TaskStatusChangeEvent) event.getEvent();
             TaskIdentifier taskIdentifier = taskStatusChangeEvent.getTaskIdentity();
             this.status = taskStatusChangeEvent.getState().toString();
             this.experimentId = taskIdentifier.getExperimentId();
@@ -91,11 +88,10 @@ public class EventData {
             this.message = "Received task event , expId : " + taskIdentifier.getExperimentId()
+ ",taskId : " +
                     taskIdentifier.getTaskId() + ", wfNodeId : " + taskIdentifier.getWorkflowNodeId()
+ ", status : " +
                     taskStatusChangeEvent.getState().toString();
-        } else if (event.getMessageType() == MessageType.JOB) {
-            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
-            ThriftUtils.createThriftFromBytes(event.getEvent(), jobStatusChangeEvent);
+        } else if (event.getType() == MessageType.JOB) {
+            JobStatusChangeEvent jobStatusChangeEvent = (JobStatusChangeEvent) event.getEvent();
             JobIdentifier jobIdentifier = jobStatusChangeEvent.getJobIdentity();
-            this.status  = jobStatusChangeEvent.getState().toString();
+            this.status = jobStatusChangeEvent.getState().toString();
             this.experimentId = jobIdentifier.getExperimentId();
             this.workflowNodeId = jobIdentifier.getWorkflowNodeId();
             this.message = "Received task event , expId : " + jobIdentifier.getExperimentId()
+ " ,taskId : " +
@@ -109,7 +105,7 @@ public class EventData {
      * 
      * @return The event
      */
-    public Message getEvent() {
+    public MessageContext getEvent() {
         return this.messageEvent;
     }
 
@@ -119,12 +115,12 @@ public class EventData {
      * @return The type
      */
     public MessageType getType() {
-        return this.messageEvent.getMessageType();
+        return this.messageEvent.getType();
     }
 
 	public Date getUpdateTime() {
         if (updateDate == null) {
-            updateDate = new Date(this.messageEvent.getUpdatedTime());
+            updateDate = new Date(this.messageEvent.getUpdatedTime().getTime());
         }
         return updateDate;
     }
@@ -141,10 +137,6 @@ public class EventData {
         return message;
     }
 
-    public MessageLevel getMessageLevel() {
-        return messageLevel;
-    }
-
     public String getMessageId() {
         return messageId;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java
b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java
index f0cc46e..1168925 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/EventDataRepository.java
@@ -101,13 +101,7 @@ public class EventDataRepository implements TableModel, BoundedRangeModel
{
         this.tableModelChangeEvent = new ChangeEvent(this); // We only need one.
         this.events = new ArrayList<EventData>();
     }
-    public void addEvent(Message message) {
-        try {
-            addEvent(new EventData(message));
-        } catch (TException e) {
-            logger.error("Error while adding new message event", e);
-        }
-    }
+
     /**
      * @param event
      */

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java
b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java
deleted file mode 100644
index d4e975f..0000000
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/MessageClient.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.xbaya.messaging;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownListener;
-import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class MessageClient {
-
-    private static final Logger log = LoggerFactory.getLogger(MessageClient.class);
-
-    private Monitor monitor;
-
-    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
-    public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
-
-    private String brokerURL;
-
-    private String exchangeName;
-
-    private String subscriptionID;
-
-    private long timeout = 20000L;
-
-    private long interval = 1000L;
-
-    private List<TerminateListener> terminateListeners = new ArrayList<TerminateListener>();
-
-    private static final Logger logger = LoggerFactory.getLogger(MessageClient.class);
-    private Connection connection;
-    private Channel channel;
-    private ExecutorService executorService;
-
-    /**
-     * Constructs a MessageMonitore.
-     *
-     * @param monitor
-     */
-    public MessageClient(Monitor monitor) {
-//        try {
-            this.monitor = monitor;
-            // We need to copy these because the configuration might change.
-//            this.brokerURL = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
-            this.brokerURL = "amqp://127.0.0.1:5672";
-//            this.exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            this.exchangeName = "airavata_rabbitmq_exchange";
-        executorService = Executors.newFixedThreadPool(25);
-            init();
-//        } catch (ApplicationSettingsException e) {
-//            logger.error("Exception while initiating monitoring client ");
-//        }
-    }
-
-    private void init() {
-        try {
-            connection = createConnection();
-            channel = connection.createChannel();
-            channel.exchangeDeclare(exchangeName, "direct", false);
-        } catch (IOException e) {
-            log.error("Error occur while Client initiating", e);
-        }
-    }
-
-    private Connection createConnection() {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(brokerURL);
-            Connection connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                    log.info("Connection shutdown listener triggered -----------");
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-            return connection;
-        } catch (Exception e) {
-            log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
-            return null;
-        }
-    }
-
-    /**
-     * Subscribes to the notification.
-     *
-     * @throws MonitorException
-     */
-    public synchronized void subscribe(String experimentId){
-        try {
-            String queueName = channel.queueDeclare().getQueue();
-            System.out.println("Experiment ID is : " + experimentId);
-            channel.queueBind(queueName, exchangeName, experimentId ); // send experiment
Id as routing Key
-            QueueingConsumer consumer = new QueueingConsumer(channel);
-            channel.basicConsume(queueName, true, consumer);
-            executorService.execute(new Thread(new RabbitMQConsumer(consumer, experimentId)));
-        } catch (IOException e) {
-            log.error("Error while subscribe to routing key : " + experimentId, e);
-        }
-
-    }
-
-    /**
-     * Unsubscribes from the notification.
-     *
-     * @throws MonitorException
-     */
-    public synchronized void unsubscribe(String experimentId) {
-        // This method needs to be synchronized along with subscribe() because
-        // unsubscribe() might be called while subscribe() is being executed.
-        // TODO - implement this, after experiment execution complete we need to unsubscribe
it.
-        notifyTerminateListeners(experimentId);
-    }
-
-    private void notifyTerminateListeners(String experimentId) {
-        for (TerminateListener terminateListener : terminateListeners) {
-            terminateListener.terminate(experimentId);
-        }
-    }
-
-    private void registerTerminateListener(TerminateListener terminateListener) {
-        terminateListeners.add(terminateListener);
-    }
-
-    public long getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(long timeout) {
-        this.timeout = timeout;
-    }
-
-    public long getInterval() {
-        return interval;
-    }
-
-    public void setInterval(long interval) {
-        this.interval = interval;
-    }
-
-    private interface TerminateListener {
-        public void terminate(String experimentId);
-    }
-    private class RabbitMQConsumer implements Runnable, TerminateListener {
-        private final Logger logger = LoggerFactory.getLogger(MessageClient.RabbitMQConsumer.class);
-        private final String id;
-        private QueueingConsumer consumer;
-        private boolean isContinue = true;
-        RabbitMQConsumer(QueueingConsumer consumer, String experimentId) {
-            this.consumer = consumer;
-            this.id = experimentId;
-            registerTerminateListener(this);
-        }
-
-        @Override
-        public void run() {
-            System.out.println("RabbitMQConsumer started for experiment " + consumer.getConsumerTag());
-            try {
-                Message message;
-                while (isContinue) {
-                    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
-                    if (delivery == null) {
-                        continue;
-                    }
-                    byte[] body = delivery.getBody();
-                    message = new Message();
-                    ThriftUtils.createThriftFromBytes(body, message);
-                    monitor.handleNotification(message);
-                }
-                System.out.println("Terminating consumer for experimentId : " + id);
-            } catch (InterruptedException e) {
-                logger.error("Error while consuming next delivery", e);
-                System.out.println("Error while consuming next delivery");
-            } catch (TException e) {
-                logger.error("Error while creating message from thrift", e);
-                System.out.println("Error while creating message from thrift");
-            }
-        }
-
-        @Override
-        public void terminate(String experimentId) {
-            if (id.equals(experimentId)) {
-                System.out.println("Terminate request came for " + experimentId);
-                isContinue = false;
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e498ac5/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
index 4c6c667..b3e7ff4 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
@@ -21,7 +21,13 @@
 
 package org.apache.airavata.xbaya.messaging;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
@@ -39,10 +45,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 //import org.xmlpull.infoset.XmlElement;
 
 public class Monitor extends EventProducer {
@@ -53,7 +56,7 @@ public class Monitor extends EventProducer {
 
     protected Map<String, EventDataRepository> eventDataMap = new HashMap<String,
EventDataRepository>();
 
-    protected MessageClient messageClient;
+    protected Consumer messageClient;
 
     protected boolean printRawMessages;
 
@@ -67,6 +70,8 @@ public class Monitor extends EventProducer {
 
     private String lastTerminatedWorkflowExecutionId=null;
 
+    private Map<String, String> expIdToSubscribers = new HashMap<String, String>();
+
     public Monitor() {
         // First one keeps all event data & it doesn't have filters
         this.eventDataMap.put(DEFAULT_MODEL_KEY, new EventDataRepository());
@@ -103,7 +108,13 @@ public class Monitor extends EventProducer {
 
     	//Notify listeners that the monitoring is about to start
     	getEventDataRepository().triggerListenerForPreMonitorStart();
-        this.messageClient = new MessageClient(this);
+        try {
+            this.messageClient = new RabbitMQConsumer();
+        } catch (AiravataException e) {
+            String msg = "Failed to start the consumer";
+            logger.error(msg, e);
+            throw new MonitorException(msg, e);
+        }
         setMonitoring(true);
         // Enable/disable some menu items and show the monitor panel.
         sendSafeEvent(new Event(Event.Type.MONITOR_STARTED));
@@ -179,71 +190,35 @@ public class Monitor extends EventProducer {
         for (String key : keysToBeRemoved) {
             this.eventDataMap.remove(key);
         }
-
     }
 
-    /**
-     * @param event
-     */
-    protected synchronized void handleNotification(Message event) {
-        EventData eventData = null;
-        boolean unsubscribeConsumer = false;
-        try {
-            eventData = new EventData(event);
-        } catch (TException e) {
-            logger.error("Error while adding new message event", e);
-            System.out.println("Error while adding new message event");
-            return;
-        }
-        Set<String> keys = this.eventDataMap.keySet();
-        // Remove everthing leaving only the last one
-        if(printRawMessages) {
-            try {
-                if (event.getMessageType() == MessageType.EXPERIMENT) {
-                    ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-                    ThriftUtils.createThriftFromBytes(event.getEvent(), experimentStatusChangeEvent);
-                    logger.info("Received experiment event , expId : {} , status : {} ",
-                            experimentStatusChangeEvent.getExperimentId(), experimentStatusChangeEvent.getState().toString());
-                    System.out.println("Received experiment event");
-
-                }   else if (event.getMessageType() == MessageType.WORKFLOWNODE) {
-                    WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
-                    ThriftUtils.createThriftFromBytes(event.getEvent(), wfnStatusChangeEvent);
-                    WorkflowIdentifier wfIdentifier = wfnStatusChangeEvent.getWorkflowNodeIdentity();
-                    logger.info("Received workflow status change event, expId : {}, nodeId
: {}, status : {} ",
-                            new String[]{wfIdentifier.getExperimentId(), wfIdentifier.getWorkflowNodeId(),
-                                    wfnStatusChangeEvent.getState().toString()});
-                    System.out.println("Received a workflow change event");
-                }else if (event.getMessageType() == MessageType.TASK) {
-                    TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-                    ThriftUtils.createThriftFromBytes(event.getEvent(), taskStatusChangeEvent);
-                    TaskIdentifier taskIdentifier = taskStatusChangeEvent.getTaskIdentity();
-                    logger.info("Received task event , expId : {} ,taskId : {}, wfNodeId
: {}, status : {} ",
-                            new String[]{taskIdentifier.getExperimentId(), taskIdentifier.getTaskId(),
-                                    taskIdentifier.getWorkflowNodeId(), taskStatusChangeEvent.getState().toString()});
-                    System.out.printf("Received a task change event");
-                } else if (event.getMessageType() == MessageType.JOB) {
-                    JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
-                    ThriftUtils.createThriftFromBytes(event.getEvent(), jobStatusChangeEvent);
-                    JobIdentifier jobIdentifier = jobStatusChangeEvent.getJobIdentity();
-                    logger.info("Received job event , expId : {}, taskId : {}, jobId : {},
wfNodeId : {}, status : {} ",
-                            new String[]{jobIdentifier.getExperimentId(), jobIdentifier.getTaskId(),
jobIdentifier.getJobId(),
-                                    jobIdentifier.getWorkflowNodeId(), jobStatusChangeEvent.getState().toString()});
-                    System.out.println("Received a job change event");
-                } else {
-                    logger.info("Received UNKNOWN event");
-                    System.out.println("Received an UNKOWN event");
-                }
-            } catch (TException e) {
-                logger.error("Error while printing thrift message ");
-                System.out.println("Error while printing thrift message");
-            }
+    private class NotificationMessageHandler implements MessageHandler {
+        private String experimentId;
+
+        private NotificationMessageHandler(String experimentId) {
+            this.experimentId = experimentId;
         }
-        for (String key : keys) {
-            this.eventDataMap.get(key).addEvent(eventData);
+
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            List<String> routingKeys = new ArrayList<String>();
+            routingKeys.add(experimentId);
+            routingKeys.add(experimentId + ".*.*");
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+            return props;
         }
-        if (eventData.getType() == MessageType.EXPERIMENT && eventData.getStatus().equals(ExperimentState.LAUNCHED.toString()))
{
-            unsubscribe(eventData.getExperimentId());
+
+        public void onMessage(MessageContext message) {
+            EventData eventData = null;
+            boolean unsubscribeConsumer = false;
+            eventData = new EventData(message);
+            Set<String> keys = eventDataMap.keySet();
+            for (String key : keys) {
+                eventDataMap.get(key).addEvent(eventData);
+            }
+            if (eventData.getType() == MessageType.EXPERIMENT && eventData.getStatus().equals(ExperimentState.LAUNCHED.toString()))
{
+                unsubscribe(eventData.getExperimentId());
+            }
         }
     }
 
@@ -252,7 +227,14 @@ public class Monitor extends EventProducer {
      * @throws MonitorException
      */
     public void subscribe(String experimentID) throws MonitorException {
-        messageClient.subscribe(experimentID);
+        try {
+            String id = messageClient.listen(new NotificationMessageHandler(experimentID));
+            expIdToSubscribers.put(experimentID, id);
+        } catch (AiravataException e) {
+            String msg = "Failed to listen to experiment: " + experimentID;
+            logger.error(msg);
+            throw new MonitorException(msg, e);
+        }
     }
 
     /**
@@ -262,7 +244,14 @@ public class Monitor extends EventProducer {
     public void unsubscribe(String experimentId){
         // Enable/disable some menu items.
         sendSafeEvent(new Event(Event.Type.MONITOR_STOPED));
-        messageClient.unsubscribe(experimentId);
+        String id = expIdToSubscribers.remove(experimentId);
+        if (id != null) {
+            try {
+                messageClient.stopListen(experimentId);
+            } catch (AiravataException e) {
+                logger.warn("Failed to find the subscriber for experiment id: " + id, e);
+            }
+        }
         setMonitoring(false);
     }
 


Mime
View raw message