airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers
Date Tue, 09 Aug 2016 22:19:39 GMT
Repository: airavata
Updated Branches:
  refs/heads/develop 4157065cc -> 20b3d251a


Refactored messaging module to remove duplicate code and support multiple subscribers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a6670f88
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a6670f88
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a6670f88

Branch: refs/heads/develop
Commit: a6670f884c8c4d2aaa5a1dff37fbe3f4ab447285
Parents: 0e14f97
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Tue Aug 9 18:17:33 2016 -0400
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Tue Aug 9 18:17:33 2016 -0400

----------------------------------------------------------------------
 .../messaging/core/MessagingFactory.java        |  88 ++++++
 .../airavata/messaging/core/Subscriber.java     |  58 ++++
 .../messaging/core/SubscriberProperties.java    | 125 ++++++++
 .../messaging/core/impl/ExperimentConsumer.java |  42 +++
 .../messaging/core/impl/ProcessConsumer.java    | 114 ++++++++
 .../core/impl/RabbitMQStatusSubscriber.java     | 287 +++++++++++++++++++
 .../messaging/core/impl/RabbitMQSubscriber.java | 189 ++++++++++++
 .../messaging/core/impl/StatusConsumer.java     | 143 +++++++++
 8 files changed, 1046 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
new file mode 100644
index 0000000..ee68d0c
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.impl.ProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber;
+import org.apache.airavata.messaging.core.impl.StatusConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagingFactory {
+
+    public static Subscriber getSubscriber(final MessageHandler messageHandler,List<String> routingKeys, Subscriber.Type type) throws AiravataException {
+        Subscriber subscriber = null;
+        SubscriberProperties sp = getSubscriberProperties();
+
+        switch (type) {
+            case EXPERIMENT_LAUNCH:
+                break;
+            case PROCESS_LAUNCH:
+                subscriber = getProcessSubscriber(sp);
+                subscriber.listen((connection ,channel) -> new ProcessConsumer(messageHandler, connection, channel),
+                        null,
+                        routingKeys);
+                break;
+            case STATUS:
+                subscriber = getStatusSubscriber(sp);
+                subscriber.listen((connection, channel) -> new StatusConsumer(messageHandler, connection, channel),
+                        null,
+                        routingKeys);
+                break;
+            default:
+                break;
+        }
+
+        return subscriber;
+    }
+
+    private static SubscriberProperties getSubscriberProperties() {
+        return new SubscriberProperties()
+                .setBrokerUrl(ServerSettings.RABBITMQ_BROKER_URL)
+                .setDurable(ServerSettings.getRabbitmqDurableQueue())
+                .setPrefetchCount(ServerSettings.getRabbitmqPrefetchCount())
+                .setAutoRecoveryEnable(true)
+                .setConsumerTag("default")
+                .setExchangeType(SubscriberProperties.EXCHANGE_TYPE.TOPIC);
+    }
+
+    private static RabbitMQSubscriber getStatusSubscriber(SubscriberProperties sp) throws AiravataException {
+        sp.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName())
+                .setAutoAck(true);
+        return new RabbitMQSubscriber(sp);
+    }
+
+
+    private static RabbitMQSubscriber getProcessSubscriber(SubscriberProperties sp) throws AiravataException {
+        sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
+                .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
+                .setAutoAck(false);
+        return new RabbitMQSubscriber(sp);
+    }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
new file mode 100644
index 0000000..7952cb3
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
+import org.apache.airavata.common.exception.AiravataException;
+
+import javax.annotation.Nonnull;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * This is the basic consumer
+ */
+public interface Subscriber {
+    /**
+     * Start listening for messages, The binding properties are specified in the handler.
+     * Returns and unique id to this Subscriber. This id can be used to stop the listening
+     * @param supplier - return RabbitMQ Consumer
+     * @return string id
+     * @throws AiravataException
+     */
+    String listen(BiFunction<Connection, Channel, Consumer>  supplier,
+                  String queueName,
+                  List<String> routingKeys) throws AiravataException;
+
+    void stopListen(final String id) throws AiravataException;
+
+    void sendAck(long deliveryTag);
+
+    enum Type {
+        EXPERIMENT_LAUNCH,
+        PROCESS_LAUNCH,
+        STATUS
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
new file mode 100644
index 0000000..025e93b
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+public class SubscriberProperties {
+    private String brokerUrl;
+    private EXCHANGE_TYPE exchangeType;
+    private String exchangeName;
+    private int prefetchCount;
+    private boolean durable;
+    private String queueName;
+    private String consumerTag = "default";
+    private boolean autoRecoveryEnable;
+    private boolean autoAck;
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public SubscriberProperties setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+        return this;
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public SubscriberProperties setDurable(boolean durable) {
+        this.durable = durable;
+        return this;
+    }
+
+    public String getExchangeName() {
+        return exchangeName;
+    }
+
+    public SubscriberProperties setExchangeName(String exchangeName) {
+        this.exchangeName = exchangeName;
+        return this;
+    }
+
+    public int getPrefetchCount() {
+        return prefetchCount;
+    }
+
+    public SubscriberProperties setPrefetchCount(int prefetchCount) {
+        this.prefetchCount = prefetchCount;
+        return this;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public SubscriberProperties setQueueName(String queueName) {
+        this.queueName = queueName;
+        return this;
+    }
+
+    public String getConsumerTag() {
+        return consumerTag;
+    }
+
+    public SubscriberProperties setConsumerTag(String consumerTag) {
+        this.consumerTag = consumerTag;
+        return this;
+    }
+
+    public boolean isAutoRecoveryEnable() {
+        return autoRecoveryEnable;
+    }
+
+    public SubscriberProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) {
+        this.autoRecoveryEnable = autoRecoveryEnable;
+        return this;
+    }
+
+    public String getExchangeType() {
+        return exchangeType.type;
+    }
+
+    public SubscriberProperties setExchangeType(EXCHANGE_TYPE exchangeType) {
+        this.exchangeType = exchangeType;
+        return this;
+    }
+
+    public boolean isAutoAck() {
+        return autoAck;
+    }
+
+    public SubscriberProperties setAutoAck(boolean autoAck) {
+        this.autoAck = autoAck;
+        return this;
+    }
+
+    public enum EXCHANGE_TYPE{
+        TOPIC("topic"),
+        FANOUT("fanout");
+
+        private String type;
+
+        EXCHANGE_TYPE(String type) {
+            this.type = type;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
new file mode 100644
index 0000000..058b99e
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+
+import java.io.IOException;
+
+public class ExperimentConsumer extends QueueingConsumer {
+    public ExperimentConsumer(Channel ch) {
+        super(ch);
+    }
+
+
+    @Override
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties properties,
+                               byte[] body) throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
new file mode 100644
index 0000000..368c100
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ProcessConsumer extends QueueingConsumer{
+    private static final Logger log = LoggerFactory.getLogger(ProcessConsumer.class);
+
+    private MessageHandler handler;
+    private Channel channel;
+    private Connection connection;
+
+    public ProcessConsumer(MessageHandler messageHandler, Connection connection, Channel channel){
+        this(channel);
+        this.handler = messageHandler;
+        this.connection = connection;
+        this.channel = channel;
+    }
+
+
+    private ProcessConsumer(Channel ch) {
+        super(ch);
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties basicProperties,
+                               byte[] body) throws IOException {
+
+        Message message = new Message();
+
+        try {
+            ThriftUtils.createThriftFromBytes(body, message);
+            TBase event = null;
+            String gatewayId = null;
+            long deliveryTag = envelope.getDeliveryTag();
+            if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+                ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
+                        " " +
+                        processSubmitEvent.getProcessId());
+                event = processSubmitEvent;
+                gatewayId = processSubmitEvent.getGatewayId();
+                MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+                        message.getMessageId(), gatewayId, deliveryTag);
+                messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                messageContext.setIsRedeliver(envelope.isRedeliver());
+                handler.onMessage(messageContext);
+            } else {
+                log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
+                        "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+                sendAck(deliveryTag);
+            }
+        } catch (TException e) {
+            String msg = "Failed to de-serialize the thrift message, from routing keys:" + envelope.getRoutingKey();
+            log.warn(msg, e);
+        }
+
+    }
+
+    private void sendAck(long deliveryTag){
+        try {
+            if (channel.isOpen()){
+                channel.basicAck(deliveryTag,false);
+            }else {
+                channel = connection.createChannel();
+                channel.basicQos(ServerSettings.getRabbitmqPrefetchCount());
+                channel.basicAck(deliveryTag, false);
+            }
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java
new file mode 100644
index 0000000..62c48d5
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java
@@ -0,0 +1,287 @@
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.messaging.core.impl;
+//
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.AiravataUtils;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.Subscriber;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessageHandler;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TBase;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import javax.annotation.Nonnull;
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//public class RabbitMQStatusSubscriber implements Subscriber {
+//	public static final String EXCHANGE_TYPE = "topic";
+//	private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class);
+//
+//    private String exchangeName;
+//    private String url;
+//    private Connection connection;
+//    private Channel channel;
+//    private int prefetchCount;
+//    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+//
+//    public RabbitMQStatusSubscriber() throws AiravataException {
+//        try {
+//            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+//            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+//            createConnection();
+//        } catch (ApplicationSettingsException e) {
+//            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+//            log.error(message, e);
+//            throw new AiravataException(message, e);
+//        }
+//    }
+//
+//    public RabbitMQStatusSubscriber(String brokerUrl, String exchangeName) throws AiravataException {
+//        this.exchangeName = exchangeName;
+//        this.url = brokerUrl;
+//
+//        createConnection();
+//    }
+//
+//    private void createConnection() throws AiravataException {
+//        try {
+//            ConnectionFactory connectionFactory = new ConnectionFactory();
+//            connectionFactory.setUri(url);
+//            connectionFactory.setAutomaticRecoveryEnabled(true);
+//            connection = connectionFactory.newConnection();
+//            connection.addShutdownListener(new ShutdownListener() {
+//                public void shutdownCompleted(ShutdownSignalException cause) {
+//                }
+//            });
+//            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+//
+//            channel = connection.createChannel();
+//            channel.basicQos(prefetchCount);
+//            channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false);
+//
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + exchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public String listen(final MessageHandler handler) throws AiravataException {
+//        try {
+//            Map<String, Object> props = handler.getProperties();
+//            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+//            if (routing == null) {
+//                throw new IllegalArgumentException("The routing key must be present");
+//            }
+//
+//            List<String> keys = new ArrayList<String>();
+//            if (routing instanceof List) {
+//                for (Object o : (List)routing) {
+//                    keys.add(o.toString());
+//                }
+//            } else if (routing instanceof String) {
+//                keys.add((String) routing);
+//            }
+//
+//            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+//            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+//            if (queueName == null) {
+//                if (!channel.isOpen()) {
+//                    channel = connection.createChannel();
+//                    channel.exchangeDeclare(exchangeName, "topic", false);
+//                }
+//                queueName = channel.queueDeclare().getQueue();
+//            } else {
+//                channel.queueDeclare(queueName, true, false, false, null);
+//            }
+//
+//            final String id = getId(keys, queueName);
+//            if (queueDetailsMap.containsKey(id)) {
+//                throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+//                        "cannot define the same subscriber twice");
+//            }
+//
+//            if (consumerTag == null) {
+//                consumerTag = "default";
+//            }
+//
+//            // bind all the routing keys
+//            for (String routingKey : keys) {
+//                channel.queueBind(queueName, exchangeName, routingKey);
+//            }
+//
+//            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+//                @Override
+//                public void handleDelivery(String consumerTag,
+//                                           Envelope envelope,
+//                                           AMQP.BasicProperties properties,
+//                                           byte[] body) {
+//                    Message message = new Message();
+//
+//                    try {
+//                        ThriftUtils.createThriftFromBytes(body, message);
+//                        TBase event = null;
+//                        String gatewayId = null;
+//
+//                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+//                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+//                                    experimentStatusChangeEvent.getState());
+//                            event = experimentStatusChangeEvent;
+//                            gatewayId = experimentStatusChangeEvent.getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.PROCESS)) {
+//	                        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
+//	                        ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
+//	                        log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
+//			                        "message type " + message.getMessageType() + " with status " +
+//			                        processStatusChangeEvent.getState());
+//	                        event = processStatusChangeEvent;
+//	                        gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.TASK)) {
+//                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+//                                    taskStatusChangeEvent.getState());
+//                            event = taskStatusChangeEvent;
+//                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+//                        }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
+//                            TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
+//                            event = taskOutputChangeEvent;
+//                            gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.JOB)) {
+//                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+//                                    jobStatusChangeEvent.getState());
+//                            event = jobStatusChangeEvent;
+//                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+//                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+//                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+//                            event = taskSubmitEvent;
+//                            gatewayId = taskSubmitEvent.getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
+//                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+//                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+//                            event = taskTerminateEvent;
+//                            gatewayId = null;
+//                        }
+//                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+//                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+//	                    messageContext.setIsRedeliver(envelope.isRedeliver());
+//                        handler.onMessage(messageContext);
+//                    } catch (TException e) {
+//                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+//                        log.warn(msg, e);
+//                    }
+//                }
+//            });
+//            // save the name for deleting the queue
+//            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+//            return id;
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + exchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void stopListen(final String id) throws AiravataException {
+//        QueueDetails details = queueDetailsMap.get(id);
+//        if (details != null) {
+//            try {
+//                for (String key : details.getRoutingKeys()) {
+//                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
+//                }
+//                channel.queueDelete(details.getQueueName(), true, true);
+//            } catch (IOException e) {
+//                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
+//                log.debug(msg);
+//            }
+//        }
+//    }
+//
+//    /**
+//     * Private class for holding some information about the consumers registered
+//     */
+//    private class QueueDetails {
+//        String queueName;
+//
+//        List<String> routingKeys;
+//
+//        private QueueDetails(String queueName, List<String> routingKeys) {
+//            this.queueName = queueName;
+//            this.routingKeys = routingKeys;
+//        }
+//
+//        public String getQueueName() {
+//            return queueName;
+//        }
+//
+//        public List<String> getRoutingKeys() {
+//            return routingKeys;
+//        }
+//    }
+//
+//    private String getId(List<String> routingKeys, String queueName) {
+//        String id = "";
+//        for (String key : routingKeys) {
+//            id = id + "_" + key;
+//        }
+//        return id + "_" + queueName;
+//    }
+//
+//    public void close() {
+//        if (connection != null) {
+//            try {
+//                connection.close();
+//            } catch (IOException ignore) {
+//            }
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
new file mode 100644
index 0000000..188847f
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.SubscriberProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+
+public class RabbitMQSubscriber implements Subscriber {
+    private static final Logger log = LoggerFactory.getLogger(RabbitMQSubscriber.class);
+
+    private Connection connection;
+    private Channel channel;
+    private Map<String, QueueDetail> queueDetailMap = new HashMap<>();
+    private SubscriberProperties properties;
+
+    public RabbitMQSubscriber(SubscriberProperties properties) throws AiravataException {
+        this.properties = properties;
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(properties.getBrokerUrl());
+            connectionFactory.setAutomaticRecoveryEnabled(properties.isAutoRecoveryEnable());
+            connection = connectionFactory.newConnection();
+            addShutdownListener();
+            log.info("connected to rabbitmq: " + connection + " for " + properties.getExchangeName());
+            channel = connection.createChannel();
+            channel.basicQos(properties.getPrefetchCount());
+            channel.exchangeDeclare(properties.getExchangeName(),
+                    properties.getExchangeType(),
+                    false);
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + properties.getExchangeName();
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    @Override
+    public String listen(BiFunction<Connection, Channel, Consumer> supplier,
+                         String queueName,
+                         List<String> routingKeys) throws AiravataException {
+
+        try {
+            if (!channel.isOpen()) {
+                channel = connection.createChannel();
+                channel.exchangeDeclare(properties.getExchangeName(), properties.getExchangeType(), false);
+            }
+            if (queueName == null) {
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+                channel.queueDeclare(queueName, true, false, false, null);
+            }
+            final String id = getId(routingKeys, queueName);
+            if (queueDetailMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+                        "cannot define the same subscriber twice");
+            }
+            // bind all the routing keys
+            for (String key : routingKeys) {
+                channel.queueBind(queueName, properties.getExchangeName(), key);
+            }
+
+            channel.basicConsume(queueName,
+                    properties.isAutoAck(),
+                    properties.getConsumerTag(),
+                    supplier.apply(connection, channel));
+
+            queueDetailMap.put(id, new QueueDetail(queueName, routingKeys));
+            return id;
+        } catch (IOException e) {
+            String msg = "could not open channel for exchange " + properties.getExchangeName();
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    @Override
+    public void stopListen(String id) throws AiravataException {
+        QueueDetail details = queueDetailMap.get(id);
+        if (details != null) {
+            try {
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), properties.getExchangeName(), key);
+                }
+                channel.queueDelete(details.getQueueName(), true, true);
+            } catch (IOException e) {
+                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + properties.getExchangeName();
+                log.debug(msg);
+            }
+        }
+    }
+
+    @Override
+    public void sendAck(long deliveryTag) {
+        try {
+            if (channel.isOpen()){
+                channel.basicAck(deliveryTag,false);
+            }else {
+                channel = connection.createChannel();
+                channel.basicQos(properties.getPrefetchCount());
+                channel.basicAck(deliveryTag, false);
+            }
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
+    private void addShutdownListener() {
+        connection.addShutdownListener(new ShutdownListener() {
+            public void shutdownCompleted(ShutdownSignalException cause) {
+            }
+        });
+    }
+
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
+        }
+    }
+
+
+    private class QueueDetail {
+        String queueName;
+        List<String> routingKeys;
+
+        private QueueDetail(String queueName, List<String> routingKeys) {
+            this.queueName = queueName;
+            this.routingKeys = routingKeys;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java
new file mode 100644
index 0000000..b5cae51
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class StatusConsumer extends DefaultConsumer {
+    private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class);
+
+    private MessageHandler handler;
+    private Connection connection;
+    private Channel channel;
+
+    public StatusConsumer(MessageHandler handler, Connection connection, Channel channel) {
+        super(channel);
+        this.handler = handler;
+        this.connection = connection;
+        this.channel = channel;
+    }
+
+    private StatusConsumer(Channel channel) {
+        super(channel);
+    }
+
+
+    @Override
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties properties,
+                               byte[] body) throws IOException {
+        Message message = new Message();
+
+        try {
+            ThriftUtils.createThriftFromBytes(body, message);
+            TBase event = null;
+            String gatewayId = null;
+
+            if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+                ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  with status " +
+                        experimentStatusChangeEvent.getState());
+                event = experimentStatusChangeEvent;
+                gatewayId = experimentStatusChangeEvent.getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.PROCESS)) {
+                ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
+                log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
+                        "message type " + message.getMessageType() + " with status " +
+                        processStatusChangeEvent.getState());
+                event = processStatusChangeEvent;
+                gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.TASK)) {
+                TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  with status " +
+                        taskStatusChangeEvent.getState());
+                event = taskStatusChangeEvent;
+                gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+            } else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
+                TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
+                event = taskOutputChangeEvent;
+                gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.JOB)) {
+                JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  with status " +
+                        jobStatusChangeEvent.getState());
+                event = jobStatusChangeEvent;
+                gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+                TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                        taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+                event = taskSubmitEvent;
+                gatewayId = taskSubmitEvent.getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
+                TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                        taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+                event = taskTerminateEvent;
+                gatewayId = null;
+            }
+            MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+            messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+            messageContext.setIsRedeliver(envelope.isRedeliver());
+            handler.onMessage(messageContext);
+        } catch (TException e) {
+            String msg = "Failed to de-serialize the thrift message, from routing keys: " + envelope.getRoutingKey();
+            log.warn(msg, e);
+        }
+    }
+}


Mime
View raw message