Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E03AC10487 for ; Fri, 18 Oct 2013 09:34:25 +0000 (UTC) Received: (qmail 30269 invoked by uid 500); 18 Oct 2013 09:34:24 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 30240 invoked by uid 500); 18 Oct 2013 09:34:23 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 30224 invoked by uid 99); 18 Oct 2013 09:34:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Oct 2013 09:34:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D7CFC917FB2; Fri, 18 Oct 2013 09:34:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Fri, 18 Oct 2013 09:34:21 -0000 Message-Id: <130cde1cf8b14fc892720f8eab0819a2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch. Updated Branches: refs/heads/camel-2.12.x b220ce070 -> 1c00d97bd refs/heads/master 3d01c27c6 -> 4a89ad969 Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4a89ad96 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4a89ad96 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4a89ad96 Branch: refs/heads/master Commit: 4a89ad969859d4adee09eddbb43cfd89a91efd57 Parents: 3d01c27 Author: Claus Ibsen Authored: Fri Oct 18 11:34:10 2013 +0200 Committer: Claus Ibsen Committed: Fri Oct 18 11:34:10 2013 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 86 +++++++++++++++----- 1 file changed, 67 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4a89ad96/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index acc95d7..4f13045 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -29,7 +29,7 @@ import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; public class RabbitMQConsumer extends DefaultConsumer { - + ExecutorService executor; Connection conn; Channel channel; @@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer { channel = conn.createChannel(); log.debug("Using channel {}", channel); - channel.exchangeDeclare(endpoint.getExchangeName(), - "direct", - endpoint.isDurable(), - endpoint.isAutoDelete(), + channel.exchangeDeclare(endpoint.getExchangeName(), "direct", + endpoint.isDurable(), endpoint.isAutoDelete(), new HashMap()); - - // need to make sure the queueDeclare is same with the exchange declare - channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null); - channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(), - endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey()); - channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel)); + // need to make sure the queueDeclare is same with the exchange declare + channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, + endpoint.isAutoDelete(), null); + channel.queueBind( + endpoint.getQueue(), + endpoint.getExchangeName(), + endpoint.getRoutingKey() == null ? "" : endpoint + .getRoutingKey()); + + channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), + new RabbitConsumer(this, channel)); } @Override @@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer { if (conn != null) { try { conn.close(); - } catch (Exception ignored) { + } catch (Exception ignored) { // ignored } } channel = null; conn = null; + if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); @@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer { private final Channel channel; /** - * Constructs a new instance and records its association to the passed-in channel. + * Constructs a new instance and records its association to the + * passed-in channel. * * @param channel the channel to which this consumer is attached */ @@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer { } @Override - public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) throws IOException { + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body); - log.trace("Created exchange [exchange={}]", new Object[]{exchange}); + mergeAmqpProperties(exchange, properties); + log.trace("Created exchange [exchange={}]", exchange); try { consumer.getProcessor().process(exchange); @@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error processing exchange", exchange, e); } } + + /** + * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()} + */ + private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) { + + if (properties.getType() != null) { + exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType()); + } + if (properties.getAppId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId()); + } + if (properties.getClusterId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId()); + } + if (properties.getContentEncoding() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding()); + } + if (properties.getContentType() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType()); + } + if (properties.getCorrelationId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId()); + } + if (properties.getExpiration() != null) { + exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration()); + } + if (properties.getMessageId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId()); + } + if (properties.getPriority() != null) { + exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority()); + } + if (properties.getReplyTo() != null) { + exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo()); + } + if (properties.getTimestamp() != null) { + exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp()); + } + if (properties.getUserId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId()); + } + } + } -} +}