airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject git commit: adding more to the message consumer
Date Tue, 14 Oct 2014 16:21:00 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 166ea5237 -> a82045d62


adding more to the message consumer

Signed-off-by: Chathuri Wimalasena <kamalasini@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a82045d6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a82045d6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a82045d6

Branch: refs/heads/master
Commit: a82045d623b3d4b9dbd8d28e1c278a5880a89f06
Parents: 166ea52
Author: supunkamburugamuva <supun06@gmail.com>
Authored: Tue Oct 14 11:59:51 2014 -0400
Committer: Chathuri Wimalasena <kamalasini@gmail.com>
Committed: Tue Oct 14 12:17:02 2014 -0400

----------------------------------------------------------------------
 .../airavata/messaging/core/Consumer.java       |  2 +-
 .../airavata/messaging/core/MessageHandler.java |  7 ++
 .../airavata/messaging/core/TestClient.java     | 10 ++-
 .../messaging/core/impl/RabbitMQConsumer.java   | 76 +++++++++++---------
 4 files changed, 58 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a82045d6/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
index 6a42491..9f4352a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
@@ -28,6 +28,6 @@ import org.apache.airavata.model.messaging.event.Message;
  * This is the basic consumer
  */
 public interface Consumer {
-    public Message listen (String routingKey) throws AiravataException;
+    public void listen(MessageHandler handler) throws AiravataException;
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a82045d6/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
new file mode 100644
index 0000000..6d6dfd7
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -0,0 +1,7 @@
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.model.messaging.event.Message;
+
+public interface MessageHandler {
+    void onMessage(Message message);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a82045d6/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index ef778db..ed04cc8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.model.messaging.event.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,8 +40,13 @@ public class TestClient {
             AiravataUtils.setExecutionAsServer();
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
             String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
-            consumer.listen(experimentId);
+            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName, experimentId);
+            consumer.listen(new MessageHandler() {
+                @Override
+                public void onMessage(Message message) {
+                    System.out.println("message received: " + message);
+                }
+            });
         } catch (ApplicationSettingsException e) {
             logger.error("Error reading airavata server properties", e);
         }catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a82045d6/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
index ee0eb4a..ba5527e 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
@@ -26,7 +26,9 @@ import com.rabbitmq.client.*;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,51 +41,59 @@ public class RabbitMQConsumer implements Consumer {
     private Channel channel;
     private String consumerTag;
     private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
+    private String routingKey;
 
-    public RabbitMQConsumer(String brokerUrl, String exchangeName){
+    public RabbitMQConsumer(String brokerUrl, String exchangeName, String routingKey) {
         this.exchangeName = exchangeName;
         this.url = brokerUrl;
+        this.routingKey = routingKey;
     }
 
-    public Message listen(String routingKey) throws AiravataException {
+    public void listen(MessageHandler handler) throws AiravataException {
         try {
             connection = createConnection();
             channel = connection.createChannel();
 
             channel.exchangeDeclare(exchangeName, "fanout", false);
-            String queueName = channel.queueDeclare().getQueue();
+            final String queueName = channel.queueDeclare().getQueue();
             channel.queueBind(queueName, exchangeName, routingKey);
-            QueueingConsumer consumer = new QueueingConsumer(channel);
-            channel.basicConsume(queueName, true, consumer);
 
-            while (true) {
-                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
-                byte[] body = delivery.getBody();
-                Message message = new Message();
-                ThriftUtils.createThriftFromBytes(body, message);
-                ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-                WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
-                TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-                JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
-                if (message.getMessageType().equals(MessageType.EXPERIMENT)){
-                    ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getMessageType() + "'
 with status " + experimentStatusChangeEvent.getState());
-                }else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)){
-                    ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getMessageType() + "'
 with status " + wfnStatusChangeEvent.getState());
-                }else if (message.getMessageType().equals(MessageType.TASK)){
-                    ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getMessageType() + "'
 with status " + taskStatusChangeEvent.getState());
-                }else if (message.getMessageType().equals(MessageType.JOB)){
-                    ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getMessageType() + "'
 with status " + jobStatusChangeEvent.getState());
+            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel)
{
+                @Override
+                public void handleDelivery(String consumerTag,
+                                           Envelope envelope,
+                                           AMQP.BasicProperties properties,
+                                           byte[] body) {
+                    Message message = new Message();
+                    try {
+                        ThriftUtils.createThriftFromBytes(body, message);
+                        ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+                        WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
+                        TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+                        JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + experimentStatusChangeEvent.getState());
+                        } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE))
{
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + wfnStatusChangeEvent.getState());
+                        } else if (message.getMessageType().equals(MessageType.TASK)) {
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + taskStatusChangeEvent.getState());
+                        } else if (message.getMessageType().equals(MessageType.JOB)) {
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType()
+ "'  with status " + jobStatusChangeEvent.getState());
+                        }
+                    } catch (TException e) {
+                        String msg = "Failed to de-serialize the thrift message, exchange:
" + exchangeName + " routingKey: " + routingKey + " queue: " + queueName;
+                        log.warn(msg, e);
+                    }
                 }
-                return message;
-            }
+            });
         } catch (Exception e) {
             reset();
             String msg = "could not open channel for exchange " + exchangeName;
@@ -112,6 +122,4 @@ public class RabbitMQConsumer implements Consumer {
             return null;
         }
     }
-
-
 }


Mime
View raw message