qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1574551 - in /qpid/trunk/qpid/java: bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/s...
Date Wed, 05 Mar 2014 16:04:17 GMT
Author: rgodfrey
Date: Wed Mar  5 16:04:16 2014
New Revision: 1574551

URL: http://svn.apache.org/r1574551
Log:
QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols.  Add functionality to the default exchange to understand AMQP 1.0 addresses.

Modified:
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Mar  5 16:04:16 2014
@@ -585,7 +585,7 @@ public class BDBMessageStoreTest extends
             _messageId = messageId;
         }
 
-        public String getRoutingKey()
+        public String getInitialRoutingAddress()
         {
             return null;
         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Mar  5 16:04:16 2014
@@ -423,11 +423,12 @@ public abstract class AbstractExchange<T
 
 
     final List<? extends BaseQueue> route(final ServerMessage message,
+                                          final String routingAddress,
                                           final InstanceProperties instanceProperties)
     {
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
-        List<? extends BaseQueue> queues = doRoute(message, instanceProperties);
+        List<? extends BaseQueue> queues = doRoute(message, routingAddress, instanceProperties);
         List<? extends BaseQueue> allQueues = queues;
 
         boolean deletedQueues = false;
@@ -464,18 +465,19 @@ public abstract class AbstractExchange<T
     }
 
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                          final InstanceProperties instanceProperties,
-                          final ServerTransaction txn,
-                          final Action<? super MessageInstance> postEnqueueAction)
+                                                                                        final String routingAddress,
+                                                                                        final InstanceProperties instanceProperties,
+                                                                                        final ServerTransaction txn,
+                                                                                        final Action<? super MessageInstance> postEnqueueAction)
     {
-        List<? extends BaseQueue> queues = route(message, instanceProperties);
+        List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
 
         if(queues == null || queues.isEmpty())
         {
             ExchangeImpl altExchange = getAlternateExchange();
             if(altExchange != null)
             {
-                return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
+                return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
             }
             else
             {
@@ -515,6 +517,7 @@ public abstract class AbstractExchange<T
     }
 
     protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
+                                                         final String routingAddress,
                                                          final InstanceProperties instanceProperties);
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java Wed Mar  5 16:04:16 2014
@@ -50,13 +50,31 @@ public class DefaultDestination implemen
 
 
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                          final InstanceProperties instanceProperties,
-                          final ServerTransaction txn,
-                          final Action<? super MessageInstance> postEnqueueAction)
+                                                                                        final String routingAddress,
+                                                                                        final InstanceProperties instanceProperties,
+                                                                                        final ServerTransaction txn,
+                                                                                        final Action<? super MessageInstance> postEnqueueAction)
     {
-        final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+        final AMQQueue q = _virtualHost.getQueue(routingAddress);
         if(q == null)
         {
+            if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
+            {
+                String[] parts = routingAddress.split("/",2);
+                ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
+                if(exchange != null)
+                {
+                    return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction);
+                }
+            }
+            else if(!routingAddress.contains("/"))
+            {
+                ExchangeImpl exchange = _virtualHost.getExchange(routingAddress);
+                if(exchange != null)
+                {
+                    return exchange.send(message, "", instanceProperties, txn, postEnqueueAction);
+                }
+            }
             return 0;
         }
         else

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Wed Mar  5 16:04:16 2014
@@ -143,11 +143,11 @@ public class DirectExchange extends Abst
     }
 
     @Override
-    public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+    public List<? extends BaseQueue> doRoute(ServerMessage payload,
+                                             final String routingKey,
+                                             final InstanceProperties instanceProperties)
     {
 
-        final String routingKey = payload.getRoutingKey();
-
         BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
 
         if(bindings != null)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Wed Mar  5 16:04:16 2014
@@ -79,7 +79,9 @@ public class FanoutExchange extends Abst
     }
 
     @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+                                        final String routingKey,
+                                        final InstanceProperties instanceProperties)
     {
 
         for(BindingImpl b : getBindings())

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Mar  5 16:04:16 2014
@@ -93,7 +93,9 @@ public class HeadersExchange extends Abs
     }
 
     @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+                                        final String routingKey,
+                                        final InstanceProperties instanceProperties)
     {
         if (_logger.isDebugEnabled())
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Mar  5 16:04:16 2014
@@ -157,12 +157,14 @@ public class TopicExchange extends Abstr
     }
 
     @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+                                        final String routingAddress,
+                                        final InstanceProperties instanceProperties)
     {
 
-        final String routingKey = payload.getRoutingKey() == null
+        final String routingKey = routingAddress == null
                                           ? ""
-                                          : payload.getRoutingKey();
+                                          : routingAddress;
 
         final Collection<AMQQueue> matchedQueues =
                 getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
@@ -181,7 +183,7 @@ public class TopicExchange extends Abstr
 
         if(queues == null || queues.isEmpty())
         {
-            _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
+            _logger.info("Message routing key: " + routingAddress + " No routes.");
         }
 
         return queues;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Wed Mar  5 16:04:16 2014
@@ -32,14 +32,18 @@ public interface MessageDestination exte
 
     /**
      * Routes a message
+     *
+     *
      * @param message the message to be routed
+     * @param routingAddress
      * @param instanceProperties the instance properties
      * @param txn the transaction to enqueue within
      * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
      * @return the number of queues in which the message was enqueued performed
      */
     <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message,
-             InstanceProperties instanceProperties,
-             ServerTransaction txn,
-             Action<? super MessageInstance> postEnqueueAction);
+                                                                          final String routingAddress,
+                                                                          InstanceProperties instanceProperties,
+                                                                          ServerTransaction txn,
+                                                                          Action<? super MessageInstance> postEnqueueAction);
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Wed Mar  5 16:04:16 2014
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
 
 public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
 {
-    String getRoutingKey();
+    String getInitialRoutingAddress();
 
     AMQMessageHeader getMessageHeader();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Wed Mar  5 16:04:16 2014
@@ -44,6 +44,7 @@ public class InternalMessage extends Abs
     private final Object _messageBody;
     private final int _contentSize;
     private InternalMessageHeader _header;
+    private String _initialRoutingAddress;
 
 
     InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
@@ -80,9 +81,9 @@ public class InternalMessage extends Abs
     }
 
     @Override
-    public String getRoutingKey()
+    public String getInitialRoutingAddress()
     {
-        return null;
+        return _initialRoutingAddress;
     }
 
     @Override
@@ -253,4 +254,8 @@ public class InternalMessage extends Abs
     }
 
 
+    public void setInitialRoutingAddress(final String initialRoutingAddress)
+    {
+        _initialRoutingAddress = initialRoutingAddress;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Mar  5 16:04:16 2014
@@ -70,7 +70,6 @@ import org.apache.qpid.server.util.Serve
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import javax.management.NotificationListener;
 import javax.security.auth.Subject;
 
 public abstract class AbstractQueue
@@ -2465,9 +2464,10 @@ public abstract class AbstractQueue
     }
 
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                              final InstanceProperties instanceProperties,
-                              final ServerTransaction txn,
-                              final Action<? super MessageInstance> postEnqueueAction)
+                                                                                        final String routingAddress,
+                                                                                        final InstanceProperties instanceProperties,
+                                                                                        final ServerTransaction txn,
+                                                                                        final Action<? super MessageInstance> postEnqueueAction)
     {
             txn.enqueue(this,message, new ServerTransaction.Action()
             {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Mar  5 16:04:16 2014
@@ -377,6 +377,7 @@ public abstract class QueueEntryImpl imp
         if (alternateExchange != null)
         {
             enqueues = alternateExchange.send(getMessage(),
+                                              getMessage().getInitialRoutingAddress(),
                                               getInstanceProperties(),
                                               txn,
                                               action);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Wed Mar  5 16:04:16 2014
@@ -127,7 +127,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key",queue2, null);
 
 
-        List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+        List<? extends BaseQueue> result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -136,7 +136,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
 
 
-        result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+        result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -144,14 +144,14 @@ public class FanoutExchangeTest extends 
 
         _exchange.deleteBinding("key",queue2);
 
-        result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+        result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
 
 
-        result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
+        result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to queue1 only", 1, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -160,7 +160,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
 
 
-        result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
+        result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY);
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Wed Mar  5 16:04:16 2014
@@ -73,7 +73,7 @@ public class HeadersExchangeTest extends
 
     protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
     {
-        List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY);
+        List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY);
         List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
         unexpected.removeAll(Arrays.asList(expected));
         assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Wed Mar  5 16:04:16 2014
@@ -324,8 +324,8 @@ public class TopicExchangeTest extends Q
     private int routeMessage(String routingKey, long messageNumber)
     {
         ServerMessage message = mock(ServerMessage.class);
-        when(message.getRoutingKey()).thenReturn(routingKey);
-        List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY);
+        when(message.getInitialRoutingAddress()).thenReturn(routingKey);
+        List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY);
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(ref);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Wed Mar  5 16:04:16 2014
@@ -119,7 +119,7 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
-        public String getRoutingKey()
+        public String getInitialRoutingAddress()
         {
             return null;
         }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Wed Mar  5 16:04:16 2014
@@ -67,7 +67,7 @@ class MockServerMessage implements Serve
         throw new NotImplementedException();
     }
 
-    public String getRoutingKey()
+    public String getInitialRoutingAddress()
     {
         throw new NotImplementedException();
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Mar  5 16:04:16 2014
@@ -417,7 +417,7 @@ public class ConsumerTarget_0_10 extends
                 {
                     logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
                                                                           queue.getName(),
-                                                                          msg.getRoutingKey()));
+                                                                          msg.getInitialRoutingAddress()));
                 }
             }
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Wed Mar  5 16:04:16 2014
@@ -30,17 +30,8 @@ import org.apache.qpid.transport.Deliver
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.typedmessage.TypedBytesContentReader;
-import org.apache.qpid.typedmessage.TypedBytesFormatException;
 
-import java.io.EOFException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
 
 public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage>
 {
@@ -123,7 +114,7 @@ public class MessageConverter_Internal_t
                 };
     }
 
-    private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size)
+    private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
     {
         DeliveryProperties deliveryProps = new DeliveryProperties();
         MessageProperties messageProps = new MessageProperties();
@@ -132,7 +123,7 @@ public class MessageConverter_Internal_t
 
         deliveryProps.setExpiration(serverMsg.getExpiration());
         deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
-        deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+        deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
         deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
 
         messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
@@ -142,7 +133,7 @@ public class MessageConverter_Internal_t
         {
             messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
         }
-
+        messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap());
         Header header = new Header(deliveryProps, messageProps, null);
         return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Wed Mar  5 16:04:16 2014
@@ -33,7 +33,6 @@ import org.apache.qpid.server.plugin.Mes
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
@@ -127,7 +126,7 @@ public class MessageConverter_v0_10 impl
 
         deliveryProps.setExpiration(serverMsg.getExpiration());
         deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
-        deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+        deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
         deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
 
         messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Wed Mar  5 16:04:16 2014
@@ -41,7 +41,7 @@ public class MessageTransferMessage exte
         return getStoredMessage().getMetaData();
     }
 
-    public String getRoutingKey()
+    public String getInitialRoutingAddress()
     {
         return getMetaData().getRoutingKey();
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Mar  5 16:04:16 2014
@@ -47,7 +47,6 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -58,7 +57,6 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -199,7 +197,10 @@ public class ServerSession extends Sessi
             _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
             invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
         }
-        int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction);
+        int enqueues = exchange.send(message,
+                                     message.getInitialRoutingAddress(),
+                                     instanceProperties, _transaction, _checkCapacityAction
+                                    );
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
         incrementOutstandingTxnsIfNecessary();
         return enqueues;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Mar  5 16:04:16 2014
@@ -378,8 +378,11 @@ public class AMQChannel<T extends AMQPro
                                     }
                                 };
 
-                        int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
-                                                                          immediate ? _immediateAction : _capacityCheckAction);
+                        int enqueues = _currentMessage.getDestination().send(amqMessage,
+                                                                             amqMessage.getInitialRoutingAddress(),
+                                                                             instanceProperties, _transaction,
+                                                                          immediate ? _immediateAction : _capacityCheckAction
+                                                                            );
                         if(enqueues == 0)
                         {
                             handleUnroutableMessage(amqMessage);
@@ -1574,7 +1577,7 @@ public class AMQChannel<T extends AMQPro
                     if (altExchange == null)
                     {
                         _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
-                        _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+                        _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress()));
 
                     }
                     else

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Wed Mar  5 16:04:16 2014
@@ -71,7 +71,7 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getContentHeaderBody();
     }
 
-    public String getRoutingKey()
+    public String getInitialRoutingAddress()
     {
         MessageMetaData messageMetaData = getMessageMetaData();
         if (messageMetaData != null)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Wed Mar  5 16:04:16 2014
@@ -125,7 +125,7 @@ public class MessageConverter_Internal_t
         };
     }
 
-    private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+    private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
     {
 
         MessagePublishInfo publishInfo = new MessagePublishInfo()
@@ -133,7 +133,7 @@ public class MessageConverter_Internal_t
                                                 @Override
                                                 public AMQShortString getExchange()
                                                 {
-                                                    return null;
+                                                    return AMQShortString.EMPTY_STRING;
                                                 }
 
                                                 @Override
@@ -157,7 +157,7 @@ public class MessageConverter_Internal_t
                                                 @Override
                                                 public AMQShortString getRoutingKey()
                                                 {
-                                                    return null;
+                                                    return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress());
                                                 }
                                             };
 
@@ -174,6 +174,7 @@ public class MessageConverter_Internal_t
         props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
         props.setUserId(serverMsg.getMessageHeader().getUserId());
 
+
         Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
 
         for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
@@ -184,6 +185,7 @@ public class MessageConverter_Internal_t
         props.setHeaders(FieldTable.convertToFieldTable(headerProps));
 
         final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+        chb.setBodySize(size);
         return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Wed Mar  5 16:04:16 2014
@@ -20,20 +20,27 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.ReplyTo;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.typedmessage.TypedBytesContentReader;
 import org.apache.qpid.typedmessage.TypedBytesFormatException;
+import org.apache.qpid.url.AMQBindingURL;
 
 import java.io.EOFException;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage>
 {
@@ -58,9 +65,210 @@ public class MessageConverter_v0_8_to_In
 
         Object body = convertMessageBody(mimeType, data);
 
-        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body);
+        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(),
+                new DelegatingMessageHeader(serverMessage.getMessageHeader()), body);
     }
 
+    private static class ReplyToComponents
+    {
+        private String _exchange;
+        private String _queue;
+        private String _routingKey;
+
+        public void setExchange(final String exchange)
+        {
+            _exchange = exchange;
+        }
+
+        public void setQueue(final String queue)
+        {
+            _queue = queue;
+        }
+
+        public void setRoutingKey(final String routingKey)
+        {
+            _routingKey = routingKey;
+        }
+
+        public String getExchange()
+        {
+            return _exchange;
+        }
+
+        public String getQueue()
+        {
+            return _queue;
+        }
+
+        public String getRoutingKey()
+        {
+            return _routingKey;
+        }
+
+        public boolean hasExchange()
+        {
+            return _exchange != null;
+        }
+
+        public boolean hasQueue()
+        {
+            return _queue != null;
+        }
+
+        public boolean hasRoutingKey()
+        {
+            return _routingKey != null;
+        }
+    }
+
+    private static class DelegatingMessageHeader implements AMQMessageHeader
+    {
+        private final AMQMessageHeader _delegate;
+
+        private DelegatingMessageHeader(final AMQMessageHeader delegate)
+        {
+            _delegate = delegate;
+        }
+
+        @Override
+        public String getCorrelationId()
+        {
+            return _delegate.getCorrelationId();
+        }
+
+        @Override
+        public long getExpiration()
+        {
+            return _delegate.getExpiration();
+        }
+
+        @Override
+        public String getUserId()
+        {
+            return _delegate.getUserId();
+        }
+
+        @Override
+        public String getAppId()
+        {
+            return _delegate.getAppId();
+        }
+
+        @Override
+        public String getMessageId()
+        {
+            return _delegate.getMessageId();
+        }
+
+        @Override
+        public String getMimeType()
+        {
+            return _delegate.getMimeType();
+        }
+
+        @Override
+        public String getEncoding()
+        {
+            return _delegate.getEncoding();
+        }
+
+        @Override
+        public byte getPriority()
+        {
+            return _delegate.getPriority();
+        }
+
+        @Override
+        public long getTimestamp()
+        {
+            return _delegate.getTimestamp();
+        }
+
+        @Override
+        public String getType()
+        {
+            return _delegate.getType();
+        }
+
+        @Override
+        public String getReplyTo()
+        {
+            String originalReplyTo = _delegate.getReplyTo();
+            ReplyToComponents replyTo = convertReplyTo(originalReplyTo);
+            if(replyTo != null)
+            {
+                if(replyTo.hasExchange())
+                {
+                    return replyTo.getExchange() + (replyTo.hasRoutingKey() ? "/" + replyTo.getRoutingKey() : "");
+                }
+                else
+                {
+                    return replyTo.hasQueue() ? replyTo.getQueue() : replyTo.getRoutingKey();
+                }
+            }
+            else
+            {
+                return originalReplyTo;
+            }
+        }
+
+        private ReplyToComponents convertReplyTo(final String origReplyToString)
+        {
+            try
+            {
+                AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+                ReplyToComponents replyTo = new ReplyToComponents();
+                AMQShortString routingKey = burl.getRoutingKey();
+                if(routingKey != null)
+                {
+                    replyTo.setRoutingKey(routingKey.asString());
+                }
+
+                AMQShortString exchangeName = burl.getExchangeName();
+                if(exchangeName != null)
+                {
+                    replyTo.setExchange(exchangeName.asString());
+                }
+
+                AMQShortString queueName = burl.getQueueName();
+                if(queueName != null)
+                {
+                    replyTo.setQueue(queueName.asString());
+                }
+                return replyTo;
+            }
+            catch (URISyntaxException e)
+            {
+                return null;
+            }
+        }
+
+        @Override
+        public Object getHeader(final String name)
+        {
+            return _delegate.getHeader(name);
+        }
+
+        @Override
+        public boolean containsHeaders(final Set<String> names)
+        {
+            return _delegate.containsHeaders(names);
+        }
+
+        @Override
+        public boolean containsHeader(final String name)
+        {
+            return _delegate.containsHeader(name);
+        }
+
+        @Override
+        public Collection<String> getHeaderNames()
+        {
+            return _delegate.getHeaderNames();
+        }
+    }
+
+
     private static Object convertMessageBody(String mimeType, byte[] data)
     {
         if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Wed Mar  5 16:04:16 2014
@@ -76,7 +76,7 @@ public class ExchangeDestination impleme
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message, instanceProperties, txn, null);
+        int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
 
 
         return enqueues == 0 ? REJECTED : ACCEPTED;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Wed Mar  5 16:04:16 2014
@@ -563,6 +563,11 @@ public class MessageMetaData_1_0 impleme
         {
             return _properties == null ? null : _properties.getTo();
         }
+
+        public Map<String, Object> getHeadersAsMap()
+        {
+            return new HashMap<String, Object>(_appProperties);
+        }
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Mar  5 16:04:16 2014
@@ -69,7 +69,7 @@ public class Message_1_0 extends Abstrac
         _arrivalTime = System.currentTimeMillis();
     }
 
-    public String getRoutingKey()
+    public String getInitialRoutingAddress()
     {
         Object routingKey = getMessageHeader().getHeader("routing-key");
         if(routingKey != null)
@@ -78,7 +78,7 @@ public class Message_1_0 extends Abstrac
         }
         else
         {
-            return getMessageHeader().getSubject();
+            return getMessageHeader().getTo();
         }
     }
 
@@ -92,12 +92,6 @@ public class Message_1_0 extends Abstrac
         return getMessageMetaData().getMessageHeader();
     }
 
-    public boolean isRedelivered()
-    {
-        // TODO
-        return false;
-    }
-
     public long getSize()
     {
         long size = 0l;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Wed Mar  5 16:04:16 2014
@@ -76,7 +76,7 @@ public class NodeReceivingDestination im
                     return null;
                 }};
 
-        int enqueues = _exchange.send(message, instanceProperties, txn, null);
+        int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
 
 
         return enqueues == 0 ? REJECTED : ACCEPTED;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java Wed Mar  5 16:04:16 2014
@@ -115,7 +115,7 @@ public class MessageConverter_0_10_to_1_
                 }
             }
 
-            props.setSubject(serverMessage.getRoutingKey());
+            props.setSubject(serverMessage.getInitialRoutingAddress());
 
             if(msgProps.hasUserId())
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Wed Mar  5 16:04:16 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
 
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
@@ -33,6 +34,7 @@ import org.apache.qpid.transport.Deliver
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
 
 import java.nio.ByteBuffer;
 
@@ -53,16 +55,18 @@ public class MessageConverter_1_0_to_v0_
     @Override
     public MessageTransferMessage convert(Message_1_0 serverMsg, VirtualHost vhost)
     {
-        return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+        return new MessageTransferMessage(convertToStoredMessage(serverMsg, vhost), null);
     }
 
-    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg)
+    private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg,
+                                                                       final VirtualHost vhost)
     {
         Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg);
 
         final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject);
 
         final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg,
+                                                                          vhost,
                                                                           MessageConverter_from_1_0.getBodyMimeType(bodyObject),
                                                                           messageContent.length);
 
@@ -119,25 +123,54 @@ public class MessageConverter_1_0_to_v0_
         };
     }
 
-    private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size)
+    private MessageMetaData_0_10 convertMetaData(Message_1_0 serverMsg,
+                                                 final VirtualHost vhost,
+                                                 final String bodyMimeType,
+                                                 final int size)
     {
         DeliveryProperties deliveryProps = new DeliveryProperties();
         MessageProperties messageProps = new MessageProperties();
 
+        final AMQMessageHeader origHeader = serverMsg.getMessageHeader();
 
 
         deliveryProps.setExpiration(serverMsg.getExpiration());
-        deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
-        deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
-        deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+        deliveryProps.setPriority(MessageDeliveryPriority.get(origHeader.getPriority()));
+        deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
+        deliveryProps.setTimestamp(origHeader.getTimestamp());
 
-        messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+        messageProps.setContentEncoding(origHeader.getEncoding());
         messageProps.setContentLength(size);
         messageProps.setContentType(bodyMimeType);
-        if(serverMsg.getMessageHeader().getCorrelationId() != null)
+        if(origHeader.getCorrelationId() != null)
         {
-            messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+            messageProps.setCorrelationId(origHeader.getCorrelationId().getBytes());
         }
+        final String origReplyTo = origHeader.getReplyTo();
+        if(origReplyTo != null && !origReplyTo.equals(""))
+        {
+            ReplyTo replyTo;
+            if(origReplyTo.startsWith("/"))
+            {
+                replyTo = new ReplyTo("",origReplyTo);
+            }
+            else if(origReplyTo.contains("/"))
+            {
+                String[] parts = origReplyTo.split("/",2);
+                replyTo = new ReplyTo(parts[0],parts[1]);
+            }
+            else if(vhost.getExchange(origReplyTo) != null)
+            {
+                replyTo = new ReplyTo(origReplyTo,"");
+            }
+            else
+            {
+                replyTo = new ReplyTo("",origReplyTo);
+            }
+            messageProps.setReplyTo(replyTo);
+        }
+
+        messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeadersAsMap());
 
         Header header = new Header(deliveryProps, messageProps, null);
         return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Mar  5 16:04:16 2014
@@ -132,7 +132,7 @@ public class MessageConverter_0_8_to_0_1
         deliveryProps.setExpiration(message_0_8.getExpiration());
         deliveryProps.setImmediate(message_0_8.isImmediate());
         deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
-        deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+        deliveryProps.setRoutingKey(message_0_8.getInitialRoutingAddress());
         deliveryProps.setTimestamp(properties.getTimestamp());
 
         messageProps.setContentEncoding(properties.getEncodingAsString());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Wed Mar  5 16:04:16 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
 
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -37,6 +38,7 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
 import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+import org.apache.qpid.url.AMQBindingURL;
 
 public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
 {
@@ -102,9 +104,45 @@ public class MessageConverter_0_8_to_1_0
         {
             props.setMessageId(new Binary(messageId.getBytes()));
         }
-        props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+        final String originalReplyTo = String.valueOf(contentHeader.getReplyTo());
+        try
+        {
+            AMQBindingURL burl = new AMQBindingURL(originalReplyTo);
+            String replyTo;
+
+            if(burl.getExchangeName() != null && !burl.getExchangeName().equals(AMQShortString.EMPTY_STRING))
+            {
+                replyTo = burl.getExchangeName().asString();
+
+                if(burl.getRoutingKey() != null)
+                {
+                    replyTo += "/" + burl.getRoutingKey().asString();
+                }
+
+            }
+            else if(burl.getQueueName() != null && !burl.getQueueName().equals(AMQShortString.EMPTY_STRING))
+            {
+                replyTo = burl.getQueueName().asString();
+            }
+            else if(burl.getRoutingKey() != null)
+            {
+                replyTo = burl.getRoutingKey().asString();
+            }
+            else
+            {
+                replyTo = originalReplyTo;
+            }
+
+            props.setReplyTo(replyTo);
+        }
+        catch (URISyntaxException e)
+        {
+            props.setReplyTo(originalReplyTo);
+        }
+
+
 
-        props.setSubject(serverMessage.getRoutingKey());
+        props.setSubject(serverMessage.getInitialRoutingAddress());
         if(contentHeader.getUserId() != null)
         {
             props.setUserId(new Binary(contentHeader.getUserId().getBytes()));

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Wed Mar  5 16:04:16 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -261,9 +262,10 @@ class ManagementNode implements MessageS
 
     @Override
     public  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
-                    final InstanceProperties instanceProperties,
-                    final ServerTransaction txn,
-                    final Action<? super MessageInstance> postEnqueueAction)
+                                                                                  final String routingAddress,
+                                                                                  final InstanceProperties instanceProperties,
+                                                                                  final ServerTransaction txn,
+                                                                                  final Action<? super MessageInstance> postEnqueueAction)
     {
 
         @SuppressWarnings("unchecked")
@@ -361,11 +363,19 @@ class ManagementNode implements MessageS
 
 
         ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo());
+        response.setInitialRoutingAddress(message.getMessageHeader().getReplyTo());
         if(consumer != null)
         {
             // TODO - check same owner
             consumer.send(response);
         }
+        else
+        {
+            _virtualHost.getDefaultDestination().send(response,
+                                                      message.getMessageHeader().getReplyTo(), InstanceProperties.EMPTY,
+                                                      new AutoCommitTransaction(_virtualHost.getMessageStore()),
+                                                      null);
+        }
         // TODO - route to a queue
 
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Mar  5 16:04:16 2014
@@ -112,6 +112,11 @@ public abstract class AMQDestination imp
         _name = name;
     }
 
+    public boolean neverDeclare()
+    {
+        return false;
+    }
+
     // ----- Fields required to support new address syntax -------
 
     public enum DestSyntax {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Mar  5 16:04:16 2014
@@ -2864,16 +2864,16 @@ public abstract class AMQSession<C exten
         }
         else
         {
-            if (_declareExchanges)
+            if (_declareExchanges && !amqd.neverDeclare())
             {
                 declareExchange(amqd, nowait);
             }
 
-            if (_delareQueues || amqd.isNameRequired())
+            if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
             {
                 declareQueue(amqd, consumer.isNoLocal(), nowait);
             }
-            if (_bindQueues)
+            if (_bindQueues && !amqd.neverDeclare())
             {
                 if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
                 {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java Wed Mar  5 16:04:16 2014
@@ -37,4 +37,10 @@ public class AMQUndefinedDestination ext
     {
         return getAMQQueueName() == null;
     }
+
+    @Override
+    public boolean neverDeclare()
+    {
+        return true;
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Mar  5 16:04:16 2014
@@ -131,7 +131,7 @@ public abstract class BasicMessageProduc
         _channelId = channelId;
         _session = session;
         _producerId = producerId;
-        if (destination != null  && !(destination instanceof AMQUndefinedDestination))
+        if (destination != null  && !(destination.neverDeclare()))
         {
             declareDestination(destination);
         }
@@ -177,7 +177,7 @@ public abstract class BasicMessageProduc
 
     void resubscribe() throws AMQException
     {
-        if (_destination != null && !(_destination instanceof AMQUndefinedDestination))
+        if (_destination != null && !_destination.neverDeclare())
         {
             declareDestination(_destination);
         }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Mar  5 16:04:16 2014
@@ -36,6 +36,7 @@ import org.apache.qpid.url.BindingURL;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -258,7 +259,29 @@ public class AMQMessageDelegate_0_8 exte
                 }
                 catch (URISyntaxException e)
                 {
-                    throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
+                    if(replyToEncoding.startsWith("/"))
+                    {
+                        dest = new DefaultRouterDestination(replyToEncoding);
+                    }
+                    else if(replyToEncoding.contains("/"))
+                    {
+                        String[] parts = replyToEncoding.split("/",2);
+                        dest = new NonBURLReplyToDestination(parts[0], parts[1]);
+
+
+                    }
+                    else
+                    {
+                        if(getAMQSession().isQueueBound(AMQShortString.valueOf(replyToEncoding), null, null))
+                        {
+                            dest = new NonBURLReplyToDestination(replyToEncoding, "");
+                        }
+                        else
+                        {
+                            dest = new DefaultRouterDestination(replyToEncoding);
+                        }
+                    }
+                    
                 }
 
                 _destinationCache.put(replyToEncoding, dest);
@@ -371,7 +394,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getBoolean(propertyName);
@@ -381,7 +404,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getByte(propertyName);
@@ -391,7 +414,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getShort(propertyName);
@@ -401,7 +424,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getInteger(propertyName);
@@ -411,7 +434,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getLong(propertyName);
@@ -421,7 +444,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getFloat(propertyName);
@@ -431,7 +454,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         return getJmsHeaders().getDouble(propertyName);
@@ -448,7 +471,7 @@ public class AMQMessageDelegate_0_8 exte
         {
             if (STRICT_AMQP_COMPLIANCE)
             {
-                throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+                throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
             }
 
             return getJmsHeaders().getString(propertyName);
@@ -469,7 +492,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         checkWritableProperties();
@@ -480,7 +503,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         checkWritableProperties();
@@ -491,7 +514,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         checkWritableProperties();
@@ -508,7 +531,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         checkWritableProperties();
@@ -519,7 +542,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         checkWritableProperties();
@@ -530,7 +553,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         if (STRICT_AMQP_COMPLIANCE)
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
         }
 
         checkWritableProperties();
@@ -585,4 +608,50 @@ public class AMQMessageDelegate_0_8 exte
 
         _readableProperties = false;
     }
+
+    private static class DefaultRouterDestination extends AMQDestination implements Queue
+    {
+        public DefaultRouterDestination(final String replyToEncoding)
+        {
+            super(AMQShortString.EMPTY_STRING,
+                  AMQShortString.valueOf("direct"),
+                  AMQShortString.valueOf(replyToEncoding),
+                  AMQShortString.valueOf(replyToEncoding));
+        }
+
+        @Override
+        public boolean isNameRequired()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean neverDeclare()
+        {
+            return true;
+        }
+    }
+
+    private static class NonBURLReplyToDestination extends AMQDestination implements Queue
+    {
+        public NonBURLReplyToDestination(final String exchange, final String routingKey)
+        {
+            super(AMQShortString.valueOf(exchange),
+                  null,
+                  AMQShortString.valueOf(routingKey),
+                  AMQShortString.valueOf(routingKey));
+        }
+
+        @Override
+        public boolean isNameRequired()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean neverDeclare()
+        {
+            return true;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1574551&r1=1574550&r2=1574551&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Mar  5 16:04:16 2014
@@ -62,7 +62,6 @@ import org.apache.qpid.util.FileUtils;
 
 import java.io.File;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.mockito.Matchers.eq;
@@ -627,7 +626,7 @@ public class MessageStoreTest extends Qp
 
 
         ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
-        exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null);
+        exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null);
 
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message