airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject git commit: adding message context as the new module
Date Wed, 15 Oct 2014 00:36:52 GMT
Repository: airavata
Updated Branches:
  refs/heads/master e51565195 -> 11002b7eb


adding message context as the new module


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/11002b7e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/11002b7e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/11002b7e

Branch: refs/heads/master
Commit: 11002b7eb0db3afa6ec9ac1841d768d88a0cccee
Parents: e515651
Author: Chathuri Wimalasena <kamalasini@gmail.com>
Authored: Tue Oct 14 20:36:36 2014 -0400
Committer: Chathuri Wimalasena <kamalasini@gmail.com>
Committed: Tue Oct 14 20:36:36 2014 -0400

----------------------------------------------------------------------
 .../airavata/messaging/core/MessageHandler.java |  2 +-
 .../airavata/messaging/core/TestClient.java     |  4 ++--
 .../messaging/core/impl/RabbitMQConsumer.java   | 20 +++++++++++++++-----
 3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/11002b7e/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index 6d6dfd7..70c4cb6 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -3,5 +3,5 @@ package org.apache.airavata.messaging.core;
 import org.apache.airavata.model.messaging.event.Message;
 
 public interface MessageHandler {
-    void onMessage(Message message);
+    void onMessage(MessageContext message);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/11002b7e/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 8d4a87b..46e3193 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -44,9 +44,9 @@ public class TestClient {
             RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName, experimentId);
             consumer.listen(new MessageHandler() {
                 @Override
-                public void onMessage(Message message) {
+                public void onMessage(MessageContext message) {
                     System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getMessageType());
+                            + "' and with message type '" + message.getType());
                     System.out.println("message received: " + message);
                 }
             });

http://git-wip-us.apache.org/repos/asf/airavata/blob/11002b7e/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
index 89de2bf..b005e89 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
@@ -26,8 +26,10 @@ import com.rabbitmq.client.*;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,30 +68,38 @@ public class RabbitMQConsumer implements Consumer {
                                            AMQP.BasicProperties properties,
                                            byte[] body) {
                     Message message = new Message();
+
                     try {
                         ThriftUtils.createThriftFromBytes(body, message);
-                        ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-                        WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
-                        TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-                        JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+                        TBase event = null;
                         if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new
ExperimentStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " + experimentStatusChangeEvent.getState());
+                            event = experimentStatusChangeEvent;
                         } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE))
{
+                            WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " + wfnStatusChangeEvent.getState());
+                            event = wfnStatusChangeEvent;
                         } else if (message.getMessageType().equals(MessageType.TASK)) {
+                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " + taskStatusChangeEvent.getState());
+                            event = taskStatusChangeEvent;
                         } else if (message.getMessageType().equals(MessageType.JOB)) {
+                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " + jobStatusChangeEvent.getState());
+                            event = jobStatusChangeEvent;
                         }
-                        handler.onMessage(message);
+
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId());
+                        handler.onMessage(messageContext);
                     } catch (TException e) {
                         String msg = "Failed to de-serialize the thrift message, exchange:
" + exchangeName + " routingKey: " + routingKey + " queue: " + queueName;
                         log.warn(msg, e);


Mime
View raw message