From commits-return-7061-apmail-airavata-commits-archive=airavata.apache.org@airavata.apache.org Wed Sep 17 20:21:32 2014 Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1FE7D11938 for ; Wed, 17 Sep 2014 20:21:32 +0000 (UTC) Received: (qmail 44497 invoked by uid 500); 17 Sep 2014 20:21:32 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 44456 invoked by uid 500); 17 Sep 2014 20:21:32 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 44447 invoked by uid 99); 17 Sep 2014 20:21:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Sep 2014 20:21:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C1698A19505; Wed, 17 Sep 2014 20:21:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chathuri@apache.org To: commits@airavata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: adding pull request Date: Wed, 17 Sep 2014 20:21:31 +0000 (UTC) Repository: airavata Updated Branches: refs/heads/messaging_framework 288fb82aa -> a7581e747 adding pull request Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a7581e74 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a7581e74 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a7581e74 Branch: refs/heads/messaging_framework Commit: a7581e747f26704a6236df16bd4530ef095143c3 Parents: 288fb82 Author: supunkamburugamuva Authored: Tue Sep 16 15:09:09 2014 -0400 Committer: Chathuri Wimalasena Committed: Wed Sep 17 16:21:24 2014 -0400 ---------------------------------------------------------------------- .../core/impl/AiravataRabbitMQPublisher.java | 3 + .../messaging/core/impl/RabbitMQProducer.java | 192 +++++++++++++++++++ 2 files changed, 195 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/a7581e74/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java index 83e7426..7fc5342 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java @@ -28,6 +28,9 @@ import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; public class AiravataRabbitMQPublisher implements Publisher { + public AiravataRabbitMQPublisher() { + } + public void publish(ExperimentStatusChangeEvent event) { } http://git-wip-us.apache.org/repos/asf/airavata/blob/a7581e74/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java new file mode 100644 index 0000000..e52161f --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + +import com.rabbitmq.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class RabbitMQProducer { + private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class); + + private Connection connection; + + private Channel channel; + + private QueueingConsumer consumer; + + private String consumerTag; + + private String exchangeName; + + private String routingKey; + + private int prefetchCount; + + private boolean isReQueueOnFail = false; + + private String url; + + public RabbitMQProducer(String url, String routingKey, String exchangeName, + int prefetchCount, boolean isReQueueOnFail) { + this.prefetchCount = prefetchCount; + this.isReQueueOnFail = isReQueueOnFail; + this.exchangeName = exchangeName; + this.routingKey = routingKey; + this.url = url; + } + + private void reset() { + consumerTag = null; + } + + private void reInitIfNecessary() throws Exception { + if (consumerTag == null || consumer == null) { + close(); + open(); + } + } + + public void close() { + log.info("Closing channel to exchange {}", exchangeName); + try { + if (channel != null && channel.isOpen()) { + if (consumerTag != null) { + channel.basicCancel(consumerTag); + } + channel.close(); + } + } catch (Exception e) { + log.debug("error closing channel and/or cancelling consumer", e); + } + try { + log.info("closing connection to rabbitmq: " + connection); + connection.close(); + } catch (Exception e) { + log.debug("error closing connection", e); + } + consumer = null; + consumerTag = null; + channel = null; + connection = null; + } + + public void open() throws Exception { + try { + connection = createConnection(); + channel = connection.createChannel(); + if (prefetchCount > 0) { + log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); + channel.basicQos(prefetchCount); + } + channel.exchangeDeclare(exchangeName, "fanout", false); + } catch (Exception e) { + reset(); + String msg = "could not open channel for exchange " + exchangeName; + log.error(msg); + throw new Exception(msg, e); + } + } + + public void send(byte []message) throws Exception { + try { + channel.basicPublish(exchangeName, routingKey, null, message); + } catch (IOException e) { + String msg = "Failed to publish message to exchange: " + exchangeName; + log.error(msg, e); + throw new Exception(msg, e); + } + } + + private Connection createConnection() throws IOException { + try { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(url); + Connection connection = connectionFactory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + } + }); + log.info("connected to rabbitmq: " + connection + " for " + exchangeName); + return connection; + } catch (Exception e) { + log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName); + return null; + } + } + + public void ackMessage(Long msgId) throws Exception { + try { + channel.basicAck(msgId, false); + } catch (ShutdownSignalException sse) { + reset(); + String msg = "shutdown signal received while attempting to ack message"; + log.error(msg, sse); + throw new Exception(msg, sse); + } catch (Exception e) { + String s = "could not ack for msgId: " + msgId; + log.error(s, e); + throw new Exception(s, e); + } + } + + public void failMessage(Long msgId) throws Exception { + if (isReQueueOnFail) { + failWithRedelivery(msgId); + } else { + deadLetter(msgId); + } + } + + public void failWithRedelivery(Long msgId) throws Exception { + try { + channel.basicReject(msgId, true); + } catch (ShutdownSignalException sse) { + reset(); + String msg = "shutdown signal received while attempting to fail with redelivery"; + log.error(msg, sse); + throw new Exception(msg, sse); + } catch (Exception e) { + String msg = "could not fail with redelivery for msgId: " + msgId; + log.error(msg, e); + throw new Exception(msg, e); + } + } + + public void deadLetter(Long msgId) throws Exception { + try { + channel.basicReject(msgId, false); + } catch (ShutdownSignalException sse) { + reset(); + String msg = "shutdown signal received while attempting to fail with no redelivery"; + log.error(msg, sse); + throw new Exception(msg, sse); + } catch (Exception e) { + String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId; + log.error(msg, e); + throw new Exception(msg, e); + } + } +}