Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7A35B10F29 for ; Wed, 5 Mar 2014 16:04:45 +0000 (UTC) Received: (qmail 90926 invoked by uid 500); 5 Mar 2014 16:04:45 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 90862 invoked by uid 500); 5 Mar 2014 16:04:44 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 90848 invoked by uid 99); 5 Mar 2014 16:04:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Mar 2014 16:04:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Mar 2014 16:04:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D48A72388B1B; Wed, 5 Mar 2014 16:04:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1574551 - in /qpid/trunk/qpid/java: bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/s... Date: Wed, 05 Mar 2014 16:04:17 -0000 To: commits@qpid.apache.org From: rgodfrey@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140305160418.D48A72388B1B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rgodfrey Date: Wed Mar 5 16:04:16 2014 New Revision: 1574551 URL: http://svn.apache.org/r1574551 Log: QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses. Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original) +++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Mar 5 16:04:16 2014 @@ -585,7 +585,7 @@ public class BDBMessageStoreTest extends _messageId = messageId; } - public String getRoutingKey() + public String getInitialRoutingAddress() { return null; } Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Mar 5 16:04:16 2014 @@ -423,11 +423,12 @@ public abstract class AbstractExchange route(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - List queues = doRoute(message, instanceProperties); + List queues = doRoute(message, routingAddress, instanceProperties); List allQueues = queues; boolean deletedQueues = false; @@ -464,18 +465,19 @@ public abstract class AbstractExchange> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { - List queues = route(message, instanceProperties); + List queues = route(message, routingAddress, instanceProperties); if(queues == null || queues.isEmpty()) { ExchangeImpl altExchange = getAlternateExchange(); if(altExchange != null) { - return altExchange.send(message, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -515,6 +517,7 @@ public abstract class AbstractExchange doRoute(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties); @Override Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java Wed Mar 5 16:04:16 2014 @@ -50,13 +50,31 @@ public class DefaultDestination implemen public final > int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { - final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + final AMQQueue q = _virtualHost.getQueue(routingAddress); if(q == null) { + if(routingAddress.contains("/") && !routingAddress.startsWith("/")) + { + String[] parts = routingAddress.split("/",2); + ExchangeImpl exchange = _virtualHost.getExchange(parts[0]); + if(exchange != null) + { + return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction); + } + } + else if(!routingAddress.contains("/")) + { + ExchangeImpl exchange = _virtualHost.getExchange(routingAddress); + if(exchange != null) + { + return exchange.send(message, "", instanceProperties, txn, postEnqueueAction); + } + } return 0; } else Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Wed Mar 5 16:04:16 2014 @@ -143,11 +143,11 @@ public class DirectExchange extends Abst } @Override - public List doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public List doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey(); - BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey); if(bindings != null) Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Wed Mar 5 16:04:16 2014 @@ -79,7 +79,9 @@ public class FanoutExchange extends Abst } @Override - public ArrayList doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { for(BindingImpl b : getBindings()) Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Mar 5 16:04:16 2014 @@ -93,7 +93,9 @@ public class HeadersExchange extends Abs } @Override - public ArrayList doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { if (_logger.isDebugEnabled()) { Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Mar 5 16:04:16 2014 @@ -157,12 +157,14 @@ public class TopicExchange extends Abstr } @Override - public ArrayList doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList doRoute(ServerMessage payload, + final String routingAddress, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey() == null + final String routingKey = routingAddress == null ? "" - : payload.getRoutingKey(); + : routingAddress; final Collection matchedQueues = getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey); @@ -181,7 +183,7 @@ public class TopicExchange extends Abstr if(queues == null || queues.isEmpty()) { - _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); + _logger.info("Message routing key: " + routingAddress + " No routes."); } return queues; Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Wed Mar 5 16:04:16 2014 @@ -32,14 +32,18 @@ public interface MessageDestination exte /** * Routes a message + * + * * @param message the message to be routed + * @param routingAddress * @param instanceProperties the instance properties * @param txn the transaction to enqueue within * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ > int send(M message, - InstanceProperties instanceProperties, - ServerTransaction txn, - Action postEnqueueAction); + final String routingAddress, + InstanceProperties instanceProperties, + ServerTransaction txn, + Action postEnqueueAction); } Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Wed Mar 5 16:04:16 2014 @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; public interface ServerMessage extends EnqueueableMessage, MessageContentSource { - String getRoutingKey(); + String getInitialRoutingAddress(); AMQMessageHeader getMessageHeader(); Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Wed Mar 5 16:04:16 2014 @@ -44,6 +44,7 @@ public class InternalMessage extends Abs private final Object _messageBody; private final int _contentSize; private InternalMessageHeader _header; + private String _initialRoutingAddress; InternalMessage(final StoredMessage handle, @@ -80,9 +81,9 @@ public class InternalMessage extends Abs } @Override - public String getRoutingKey() + public String getInitialRoutingAddress() { - return null; + return _initialRoutingAddress; } @Override @@ -253,4 +254,8 @@ public class InternalMessage extends Abs } + public void setInitialRoutingAddress(final String initialRoutingAddress) + { + _initialRoutingAddress = initialRoutingAddress; + } } Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Mar 5 16:04:16 2014 @@ -70,7 +70,6 @@ import org.apache.qpid.server.util.Serve import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.NotificationListener; import javax.security.auth.Subject; public abstract class AbstractQueue @@ -2465,9 +2464,10 @@ public abstract class AbstractQueue } public final > int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { txn.enqueue(this,message, new ServerTransaction.Action() { Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Mar 5 16:04:16 2014 @@ -377,6 +377,7 @@ public abstract class QueueEntryImpl imp if (alternateExchange != null) { enqueues = alternateExchange.send(getMessage(), + getMessage().getInitialRoutingAddress(), getInstanceProperties(), txn, action); Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original) +++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Wed Mar 5 16:04:16 2014 @@ -127,7 +127,7 @@ public class FanoutExchangeTest extends _exchange.addBinding("key",queue2, null); - List result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + List result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -136,7 +136,7 @@ public class FanoutExchangeTest extends _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -144,14 +144,14 @@ public class FanoutExchangeTest extends _exchange.deleteBinding("key",queue2); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to queue1 only", 1, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -160,7 +160,7 @@ public class FanoutExchangeTest extends _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original) +++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Wed Mar 5 16:04:16 2014 @@ -73,7 +73,7 @@ public class HeadersExchangeTest extends protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception { - List results = _exchange.route(msg, InstanceProperties.EMPTY); + List results = _exchange.route(msg, "", InstanceProperties.EMPTY); List unexpected = new ArrayList(results); unexpected.removeAll(Arrays.asList(expected)); assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original) +++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Wed Mar 5 16:04:16 2014 @@ -324,8 +324,8 @@ public class TopicExchangeTest extends Q private int routeMessage(String routingKey, long messageNumber) { ServerMessage message = mock(ServerMessage.class); - when(message.getRoutingKey()).thenReturn(routingKey); - List queues = _exchange.route(message, InstanceProperties.EMPTY); + when(message.getInitialRoutingAddress()).thenReturn(routingKey); + List queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original) +++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Wed Mar 5 16:04:16 2014 @@ -119,7 +119,7 @@ public class TestMessageMetaDataType imp } @Override - public String getRoutingKey() + public String getInitialRoutingAddress() { return null; } Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original) +++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Wed Mar 5 16:04:16 2014 @@ -67,7 +67,7 @@ class MockServerMessage implements Serve throw new NotImplementedException(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { throw new NotImplementedException(); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Mar 5 16:04:16 2014 @@ -417,7 +417,7 @@ public class ConsumerTarget_0_10 extends { logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), - msg.getRoutingKey())); + msg.getInitialRoutingAddress())); } } } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Wed Mar 5 16:04:16 2014 @@ -30,17 +30,8 @@ import org.apache.qpid.transport.Deliver import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.typedmessage.TypedBytesContentReader; -import org.apache.qpid.typedmessage.TypedBytesFormatException; -import java.io.EOFException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; public class MessageConverter_Internal_to_v0_10 implements MessageConverter { @@ -123,7 +114,7 @@ public class MessageConverter_Internal_t }; } - private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size) + private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size) { DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProps = new MessageProperties(); @@ -132,7 +123,7 @@ public class MessageConverter_Internal_t deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); @@ -142,7 +133,7 @@ public class MessageConverter_Internal_t { messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); } - + messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap()); Header header = new Header(deliveryProps, messageProps, null); return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Wed Mar 5 16:04:16 2014 @@ -33,7 +33,6 @@ import org.apache.qpid.server.plugin.Mes import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -127,7 +126,7 @@ public class MessageConverter_v0_10 impl deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Wed Mar 5 16:04:16 2014 @@ -41,7 +41,7 @@ public class MessageTransferMessage exte return getStoredMessage().getMetaData(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { return getMetaData().getRoutingKey(); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Mar 5 16:04:16 2014 @@ -47,7 +47,6 @@ import org.apache.qpid.server.store.Stor import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -58,7 +57,6 @@ import org.apache.qpid.server.logging.su import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.queue.AMQQueue; @@ -199,7 +197,10 @@ public class ServerSession extends Sessi _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } - int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction); + int enqueues = exchange.send(message, + message.getInitialRoutingAddress(), + instanceProperties, _transaction, _checkCapacityAction + ); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); return enqueues; Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Mar 5 16:04:16 2014 @@ -378,8 +378,11 @@ public class AMQChannel headerProps = new LinkedHashMap(); for(String headerName : serverMsg.getMessageHeader().getHeaderNames()) @@ -184,6 +185,7 @@ public class MessageConverter_Internal_t props.setHeaders(FieldTable.convertToFieldTable(headerProps)); final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + chb.setBodySize(size); return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Wed Mar 5 16:04:16 2014 @@ -20,20 +20,27 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.apache.qpid.url.AMQBindingURL; import java.io.EOFException; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class MessageConverter_v0_8_to_Internal implements MessageConverter { @@ -58,9 +65,210 @@ public class MessageConverter_v0_8_to_In Object body = convertMessageBody(mimeType, data); - return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body); + return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), + new DelegatingMessageHeader(serverMessage.getMessageHeader()), body); } + private static class ReplyToComponents + { + private String _exchange; + private String _queue; + private String _routingKey; + + public void setExchange(final String exchange) + { + _exchange = exchange; + } + + public void setQueue(final String queue) + { + _queue = queue; + } + + public void setRoutingKey(final String routingKey) + { + _routingKey = routingKey; + } + + public String getExchange() + { + return _exchange; + } + + public String getQueue() + { + return _queue; + } + + public String getRoutingKey() + { + return _routingKey; + } + + public boolean hasExchange() + { + return _exchange != null; + } + + public boolean hasQueue() + { + return _queue != null; + } + + public boolean hasRoutingKey() + { + return _routingKey != null; + } + } + + private static class DelegatingMessageHeader implements AMQMessageHeader + { + private final AMQMessageHeader _delegate; + + private DelegatingMessageHeader(final AMQMessageHeader delegate) + { + _delegate = delegate; + } + + @Override + public String getCorrelationId() + { + return _delegate.getCorrelationId(); + } + + @Override + public long getExpiration() + { + return _delegate.getExpiration(); + } + + @Override + public String getUserId() + { + return _delegate.getUserId(); + } + + @Override + public String getAppId() + { + return _delegate.getAppId(); + } + + @Override + public String getMessageId() + { + return _delegate.getMessageId(); + } + + @Override + public String getMimeType() + { + return _delegate.getMimeType(); + } + + @Override + public String getEncoding() + { + return _delegate.getEncoding(); + } + + @Override + public byte getPriority() + { + return _delegate.getPriority(); + } + + @Override + public long getTimestamp() + { + return _delegate.getTimestamp(); + } + + @Override + public String getType() + { + return _delegate.getType(); + } + + @Override + public String getReplyTo() + { + String originalReplyTo = _delegate.getReplyTo(); + ReplyToComponents replyTo = convertReplyTo(originalReplyTo); + if(replyTo != null) + { + if(replyTo.hasExchange()) + { + return replyTo.getExchange() + (replyTo.hasRoutingKey() ? "/" + replyTo.getRoutingKey() : ""); + } + else + { + return replyTo.hasQueue() ? replyTo.getQueue() : replyTo.getRoutingKey(); + } + } + else + { + return originalReplyTo; + } + } + + private ReplyToComponents convertReplyTo(final String origReplyToString) + { + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + ReplyToComponents replyTo = new ReplyToComponents(); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + + AMQShortString queueName = burl.getQueueName(); + if(queueName != null) + { + replyTo.setQueue(queueName.asString()); + } + return replyTo; + } + catch (URISyntaxException e) + { + return null; + } + } + + @Override + public Object getHeader(final String name) + { + return _delegate.getHeader(name); + } + + @Override + public boolean containsHeaders(final Set names) + { + return _delegate.containsHeaders(names); + } + + @Override + public boolean containsHeader(final String name) + { + return _delegate.containsHeader(name); + } + + @Override + public Collection getHeaderNames() + { + return _delegate.getHeaderNames(); + } + } + + private static Object convertMessageBody(String mimeType, byte[] data) { if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Wed Mar 5 16:04:16 2014 @@ -76,7 +76,7 @@ public class ExchangeDestination impleme return null; }}; - int enqueues = _exchange.send(message, instanceProperties, txn, null); + int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Wed Mar 5 16:04:16 2014 @@ -563,6 +563,11 @@ public class MessageMetaData_1_0 impleme { return _properties == null ? null : _properties.getTo(); } + + public Map getHeadersAsMap() + { + return new HashMap(_appProperties); + } } } Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Mar 5 16:04:16 2014 @@ -69,7 +69,7 @@ public class Message_1_0 extends Abstrac _arrivalTime = System.currentTimeMillis(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { Object routingKey = getMessageHeader().getHeader("routing-key"); if(routingKey != null) @@ -78,7 +78,7 @@ public class Message_1_0 extends Abstrac } else { - return getMessageHeader().getSubject(); + return getMessageHeader().getTo(); } } @@ -92,12 +92,6 @@ public class Message_1_0 extends Abstrac return getMessageMetaData().getMessageHeader(); } - public boolean isRedelivered() - { - // TODO - return false; - } - public long getSize() { long size = 0l; Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Wed Mar 5 16:04:16 2014 @@ -76,7 +76,7 @@ public class NodeReceivingDestination im return null; }}; - int enqueues = _exchange.send(message, instanceProperties, txn, null); + int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java Wed Mar 5 16:04:16 2014 @@ -115,7 +115,7 @@ public class MessageConverter_0_10_to_1_ } } - props.setSubject(serverMessage.getRoutingKey()); + props.setSubject(serverMessage.getInitialRoutingAddress()); if(msgProps.hasUserId()) { Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Wed Mar 5 16:04:16 2014 @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.converter.v0_10_v1_0; +import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; @@ -33,6 +34,7 @@ import org.apache.qpid.transport.Deliver import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; import java.nio.ByteBuffer; @@ -53,16 +55,18 @@ public class MessageConverter_1_0_to_v0_ @Override public MessageTransferMessage convert(Message_1_0 serverMsg, VirtualHost vhost) { - return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); + return new MessageTransferMessage(convertToStoredMessage(serverMsg, vhost), null); } - private StoredMessage convertToStoredMessage(final Message_1_0 serverMsg) + private StoredMessage convertToStoredMessage(final Message_1_0 serverMsg, + final VirtualHost vhost) { Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject); final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg, + vhost, MessageConverter_from_1_0.getBodyMimeType(bodyObject), messageContent.length); @@ -119,25 +123,54 @@ public class MessageConverter_1_0_to_v0_ }; } - private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size) + private MessageMetaData_0_10 convertMetaData(Message_1_0 serverMsg, + final VirtualHost vhost, + final String bodyMimeType, + final int size) { DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProps = new MessageProperties(); + final AMQMessageHeader origHeader = serverMsg.getMessageHeader(); deliveryProps.setExpiration(serverMsg.getExpiration()); - deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); - deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + deliveryProps.setPriority(MessageDeliveryPriority.get(origHeader.getPriority())); + deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); + deliveryProps.setTimestamp(origHeader.getTimestamp()); - messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); + messageProps.setContentEncoding(origHeader.getEncoding()); messageProps.setContentLength(size); messageProps.setContentType(bodyMimeType); - if(serverMsg.getMessageHeader().getCorrelationId() != null) + if(origHeader.getCorrelationId() != null) { - messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); + messageProps.setCorrelationId(origHeader.getCorrelationId().getBytes()); } + final String origReplyTo = origHeader.getReplyTo(); + if(origReplyTo != null && !origReplyTo.equals("")) + { + ReplyTo replyTo; + if(origReplyTo.startsWith("/")) + { + replyTo = new ReplyTo("",origReplyTo); + } + else if(origReplyTo.contains("/")) + { + String[] parts = origReplyTo.split("/",2); + replyTo = new ReplyTo(parts[0],parts[1]); + } + else if(vhost.getExchange(origReplyTo) != null) + { + replyTo = new ReplyTo(origReplyTo,""); + } + else + { + replyTo = new ReplyTo("",origReplyTo); + } + messageProps.setReplyTo(replyTo); + } + + messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeadersAsMap()); Header header = new Header(deliveryProps, messageProps, null); return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Mar 5 16:04:16 2014 @@ -132,7 +132,7 @@ public class MessageConverter_0_8_to_0_1 deliveryProps.setExpiration(message_0_8.getExpiration()); deliveryProps.setImmediate(message_0_8.isImmediate()); deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority())); - deliveryProps.setRoutingKey(message_0_8.getRoutingKey()); + deliveryProps.setRoutingKey(message_0_8.getInitialRoutingAddress()); deliveryProps.setTimestamp(properties.getTimestamp()); messageProps.setContentEncoding(properties.getEncodingAsString()); Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Wed Mar 5 16:04:16 2014 @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.converter.v0_8_v1_0; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; @@ -37,6 +38,7 @@ import org.apache.qpid.framing.FieldTabl import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; +import org.apache.qpid.url.AMQBindingURL; public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0 { @@ -102,9 +104,45 @@ public class MessageConverter_0_8_to_1_0 { props.setMessageId(new Binary(messageId.getBytes())); } - props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); + final String originalReplyTo = String.valueOf(contentHeader.getReplyTo()); + try + { + AMQBindingURL burl = new AMQBindingURL(originalReplyTo); + String replyTo; + + if(burl.getExchangeName() != null && !burl.getExchangeName().equals(AMQShortString.EMPTY_STRING)) + { + replyTo = burl.getExchangeName().asString(); + + if(burl.getRoutingKey() != null) + { + replyTo += "/" + burl.getRoutingKey().asString(); + } + + } + else if(burl.getQueueName() != null && !burl.getQueueName().equals(AMQShortString.EMPTY_STRING)) + { + replyTo = burl.getQueueName().asString(); + } + else if(burl.getRoutingKey() != null) + { + replyTo = burl.getRoutingKey().asString(); + } + else + { + replyTo = originalReplyTo; + } + + props.setReplyTo(replyTo); + } + catch (URISyntaxException e) + { + props.setReplyTo(originalReplyTo); + } + + - props.setSubject(serverMessage.getRoutingKey()); + props.setSubject(serverMessage.getInitialRoutingAddress()); if(contentHeader.getUserId() != null) { props.setUserId(new Binary(contentHeader.getUserId().getBytes())); Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original) +++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Wed Mar 5 16:04:16 2014 @@ -44,6 +44,7 @@ import org.apache.qpid.server.protocol.A import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -261,9 +262,10 @@ class ManagementNode implements MessageS @Override public > int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { @SuppressWarnings("unchecked") @@ -361,11 +363,19 @@ class ManagementNode implements MessageS ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo()); + response.setInitialRoutingAddress(message.getMessageHeader().getReplyTo()); if(consumer != null) { // TODO - check same owner consumer.send(response); } + else + { + _virtualHost.getDefaultDestination().send(response, + message.getMessageHeader().getReplyTo(), InstanceProperties.EMPTY, + new AutoCommitTransaction(_virtualHost.getMessageStore()), + null); + } // TODO - route to a queue } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Mar 5 16:04:16 2014 @@ -112,6 +112,11 @@ public abstract class AMQDestination imp _name = name; } + public boolean neverDeclare() + { + return false; + } + // ----- Fields required to support new address syntax ------- public enum DestSyntax { Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1574551&r1=1574550&r2=1574551&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Mar 5 16:04:16 2014 @@ -2864,16 +2864,16 @@ public abstract class AMQSession