Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 446C6200B72 for ; Fri, 26 Aug 2016 17:43:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4389B160A94; Fri, 26 Aug 2016 15:43:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 02C82160AD7 for ; Fri, 26 Aug 2016 17:42:58 +0200 (CEST) Received: (qmail 17164 invoked by uid 500); 26 Aug 2016 15:42:57 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 16323 invoked by uid 99); 26 Aug 2016 15:42:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Aug 2016 15:42:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 19DDBE69B0; Fri, 26 Aug 2016 15:42:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Date: Fri, 26 Aug 2016 15:43:18 -0000 Message-Id: <4918b6b0568447f19cf8426f809e2a91@git.apache.org> In-Reply-To: <90c724ae638749f8b9f3701a4687b75c@git.apache.org> References: <90c724ae638749f8b9f3701a4687b75c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] airavata git commit: Fixed messaging publishing issues archived-at: Fri, 26 Aug 2016 15:43:01 -0000 Fixed messaging publishing issues Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9f979b50 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9f979b50 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9f979b50 Branch: refs/heads/lahiru/AIRAVATA-2057 Commit: 9f979b50b972db48b0d80bcdbbe3dd932c9a0bc4 Parents: 3fcde52 Author: Shameera Rathnayaka Authored: Mon Aug 15 15:45:12 2016 -0400 Committer: Shameera Rathnayaka Committed: Mon Aug 15 15:45:12 2016 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/server/GfacServerHandler.java | 2 +- .../apache/airavata/messaging/core/MessagingFactory.java | 3 ++- .../airavata/messaging/core/impl/ExperimentConsumer.java | 2 +- .../airavata/messaging/core/impl/ProcessConsumer.java | 10 ++++------ .../airavata/messaging/core/impl/RabbitMQPublisher.java | 3 ++- .../airavata/messaging/core/impl/RabbitMQSubscriber.java | 7 ++++++- 6 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 44073dc..a7b0714 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -97,7 +97,7 @@ public class GfacServerHandler implements GfacService.Iface { private void initAMQPClient() throws AiravataException { // init process consumer List routingKeys = new ArrayList<>(); - routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName()); + routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName()); processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH); // init status publisher statusPublisher = Factory.getStatusPublisher(); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java index b3e6d35..802ea5a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java @@ -123,7 +123,7 @@ public class MessagingFactory { private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName()) - .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName()) + .setQueueName("process_launch") .setAutoAck(false); return new RabbitMQSubscriber(sp); } @@ -131,6 +131,7 @@ public class MessagingFactory { private static Subscriber getExperimentSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName()) + .setQueueName("experiment_launch") .setAutoAck(false); return new RabbitMQSubscriber(sp); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java index 6e4c46a..5010358 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java @@ -72,7 +72,7 @@ public class ExperimentConsumer extends QueueingConsumer { String gatewayId = null; ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent); - log.debug(" Message Received with message id '" + message.getMessageId() + log.info(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "' for experimentId:" + " " + experimentEvent.getExperimentId()); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java index e95a7ca..69910bd 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java @@ -55,8 +55,7 @@ public class ProcessConsumer extends QueueingConsumer{ } - @Override - public void handleDelivery(String consumerTag, + @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { @@ -71,10 +70,9 @@ public class ProcessConsumer extends QueueingConsumer{ if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) { ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' for experimentId:" + - " " + - processSubmitEvent.getProcessId()); + log.info(" Message Received with message id '" + message.getMessageId() + + " and with message type:" + message.getMessageType() + ", for processId:" + + processSubmitEvent.getProcessId() + ", expId:" + processSubmitEvent.getExperimentId()); event = processSubmitEvent; gatewayId = processSubmitEvent.getGatewayId(); MessageContext messageContext = new MessageContext(event, message.getMessageType(), http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java index 3fdb3a1..6f1d1d8 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -71,7 +71,7 @@ public class RabbitMQPublisher implements Publisher { if (properties.getExchangeName() != null) { channel.exchangeDeclare(properties.getExchangeName(), properties.getExchangeType(), - false); + true); //durable } } catch (Exception e) { String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName(); @@ -93,6 +93,7 @@ public class RabbitMQPublisher implements Publisher { message.setMessageType(messageContext.getType()); message.setUpdatedTime(messageContext.getUpdatedTime().getTime()); String routingKey = routingKeySupplier.apply(messageContext); + log.info("publish messageId:" + messageContext.getMessageId() + ", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey); byte[] messageBody = ThriftUtils.serializeThriftObject(message); send(messageBody, routingKey); } catch (TException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java index 441281d..6b28723 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java @@ -85,7 +85,11 @@ public class RabbitMQSubscriber implements Subscriber { if (queueName == null) { queueName = channel.queueDeclare().getQueue(); } else { - channel.queueDeclare(queueName, true, false, false, null); + channel.queueDeclare(queueName, + true, // durable + false, // exclusive + false, // autoDelete + null);// arguments } final String id = getId(routingKeys, queueName); if (queueDetailMap.containsKey(id)) { @@ -94,6 +98,7 @@ public class RabbitMQSubscriber implements Subscriber { } // bind all the routing keys for (String key : routingKeys) { + log.info("Binding key:" + key + " to queue:" + queueName); channel.queueBind(queueName, properties.getExchangeName(), key); }