airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject git commit: rabbit mq consumer
Date Thu, 09 Oct 2014 16:23:52 GMT
Repository: airavata
Updated Branches:
  refs/heads/master ad02735f1 -> 8b978c62e


rabbit mq consumer


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8b978c62
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8b978c62
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8b978c62

Branch: refs/heads/master
Commit: 8b978c62ec74950fb1c37ca37d38301c7506033e
Parents: ad02735
Author: Chathuri Wimalasena <kamalasini@gmail.com>
Authored: Thu Oct 9 12:23:45 2014 -0400
Committer: Chathuri Wimalasena <kamalasini@gmail.com>
Committed: Thu Oct 9 12:23:45 2014 -0400

----------------------------------------------------------------------
 modules/messaging/core/pom.xml                  |   4 +-
 .../airavata/messaging/core/Consumer.java       |  32 +++++
 .../airavata/messaging/core/Publisher.java      |   9 ++
 .../airavata/messaging/core/TestClient.java     |  51 ++++++++
 .../messaging/core/impl/RabbitMQConsumer.java   | 116 +++++++++++++++++++
 modules/xbaya-gui/pom.xml                       |  10 +-
 .../org/apache/airavata/xbaya/XBayaEngine.java  |   3 +
 7 files changed, 218 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/messaging/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/messaging/core/pom.xml b/modules/messaging/core/pom.xml
index 4b36cda..428c46d 100644
--- a/modules/messaging/core/pom.xml
+++ b/modules/messaging/core/pom.xml
@@ -56,8 +56,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-client-configuration</artifactId>
-            <scope>test</scope>
+            <artifactId>airavata-server-configuration</artifactId>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>com.rabbitmq</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
new file mode 100644
index 0000000..17d18a5
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+
+/**
+ * This is the basic consumer
+ */
+public interface Consumer {
+    public void listen (String routingKey) throws AiravataException;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index 4452856..4878254 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -24,6 +24,15 @@ package org.apache.airavata.messaging.core;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.model.messaging.event.*;
 
+/**
+ * This is the basic publisher interface.
+ */
 public interface Publisher {
+
+    /**
+     *
+     * @param message object of message context which will include actual event and other
information
+     * @throws AiravataException
+     */
     public void publish(MessageContext message) throws AiravataException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
new file mode 100644
index 0000000..ef778db
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestClient {
+    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+    public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+    private final static Logger logger = LoggerFactory.getLogger(TestClient.class);
+    private final static String experimentId = "echoExperiment_825d89fa-179a-4da0-ae1e-e7fa0e58bdb2";
+
+    public static void main(String[] args) {
+        try {
+            AiravataUtils.setExecutionAsServer();
+            String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
+            String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
+            consumer.listen(experimentId);
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error reading airavata server properties", e);
+        }catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
new file mode 100644
index 0000000..402251e
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.model.messaging.event.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RabbitMQConsumer implements Consumer {
+    private String exchangeName;
+    private String url;
+    private Connection connection;
+    private Channel channel;
+    private String consumerTag;
+    private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
+
+    public RabbitMQConsumer(String brokerUrl, String exchangeName){
+        this.exchangeName = exchangeName;
+        this.url = brokerUrl;
+    }
+
+    public void listen(String routingKey) throws AiravataException {
+        try {
+            connection = createConnection();
+            channel = connection.createChannel();
+
+            channel.exchangeDeclare(exchangeName, "fanout", false);
+            String queueName = channel.queueDeclare().getQueue();
+            channel.queueBind(queueName, exchangeName, routingKey);
+            QueueingConsumer consumer = new QueueingConsumer(channel);
+            channel.basicConsume(queueName, true, consumer);
+
+            while (true) {
+                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+                byte[] body = delivery.getBody();
+                Message message = new Message();
+                ThriftUtils.createThriftFromBytes(body, message);
+                ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+                WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
+                TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+                JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+                if (message.getMessageType().equals(MessageType.EXPERIMENT)){
+                    ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getMessageType() + "'
 with status " + experimentStatusChangeEvent.getState());
+                }else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)){
+                    ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getMessageType() + "'
 with status " + wfnStatusChangeEvent.getState());
+                }else if (message.getMessageType().equals(MessageType.TASK)){
+                    ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getMessageType() + "'
 with status " + taskStatusChangeEvent.getState());
+                }else if (message.getMessageType().equals(MessageType.JOB)){
+                    ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getMessageType() + "'
 with status " + jobStatusChangeEvent.getState());
+                }
+            }
+        } catch (Exception e) {
+            reset();
+            String msg = "could not open channel for exchange " + exchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    private void reset() {
+        consumerTag = null;
+    }
+
+    private Connection createConnection() throws IOException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            Connection connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+            return connection;
+        } catch (Exception e) {
+            log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/xbaya-gui/pom.xml
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/pom.xml b/modules/xbaya-gui/pom.xml
index 491d7c8..6944874 100644
--- a/modules/xbaya-gui/pom.xml
+++ b/modules/xbaya-gui/pom.xml
@@ -219,11 +219,11 @@
             <artifactId>airavata-common-utils</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-workflow-tracking</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.apache.airavata</groupId>-->
+            <!--<artifactId>airavata-workflow-tracking</artifactId>-->
+            <!--<version>${project.version}</version>-->
+        <!--</dependency>-->
 	<dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-api-stubs</artifactId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/8b978c62/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/XBayaEngine.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/XBayaEngine.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/XBayaEngine.java
index 52ab853..ec85e8e 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/XBayaEngine.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/XBayaEngine.java
@@ -51,8 +51,11 @@ public class XBayaEngine {
 
     private WorkflowClient workflowClient;
 
+    //FIXME: use rabbit mq producer instead of WS-Messanger producer
     private Monitor monitor;
 
+
+
     private boolean exitOnClose = true;
 
     private ComponentTreeNode systemComponentTree;


Mime
View raw message