qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1560524 - in /qpid/trunk/qpid/java: bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/exchange/topic/ broker...
Date Wed, 22 Jan 2014 21:43:48 GMT
Author: rgodfrey
Date: Wed Jan 22 21:43:46 2014
New Revision: 1560524

URL: http://svn.apache.org/r1560524
Log:
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
      - copied, changed from r1560435, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
      - copied, changed from r1560435, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
Removed:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java
Modified:
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Jan 22 21:43:46 2014
@@ -608,5 +608,11 @@ public class BDBMessageStoreTest extends
         {
             return null;
         }
+
+        @Override
+        public Object getConnectionReference()
+        {
+            return null;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Jan 22 21:43:46 2014
@@ -32,7 +32,8 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.BindingLogSubject;
 import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -373,11 +374,13 @@ public abstract class AbstractExchange i
         return getBindings().size();
     }
 
-    public final List<? extends BaseQueue> route(final InboundMessage message)
+    @Override
+    public final List<? extends BaseQueue> route(final ServerMessage message,
+                                                 final InstanceProperties instanceProperties)
     {
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
-        List<? extends BaseQueue> queues = doRoute(message);
+        List<? extends BaseQueue> queues = doRoute(message, instanceProperties);
         List<? extends BaseQueue> allQueues = queues;
 
         boolean deletedQueues = false;
@@ -413,7 +416,8 @@ public abstract class AbstractExchange i
         return queues;
     }
 
-    protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message);
+    protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
+                                                         final InstanceProperties instanceProperties);
 
     public long getMsgReceives()
     {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Wed Jan 22 21:43:46 2014
@@ -35,7 +35,8 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -203,7 +204,7 @@ public class DefaultExchange implements 
     }
 
     @Override
-    public List<AMQQueue> route(InboundMessage message)
+    public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties)
     {
         AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
         if(q == null)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Wed Jan 22 21:43:46 2014
@@ -27,8 +27,11 @@ import java.util.Set;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -130,7 +133,8 @@ public class DirectExchange extends Abst
         super(TYPE);
     }
 
-    public List<? extends BaseQueue> doRoute(InboundMessage payload)
+    @Override
+    public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
     {
 
         final String routingKey = payload.getRoutingKey();
@@ -151,7 +155,7 @@ public class DirectExchange extends Abst
                     if(!queuesSet.contains(entry.getKey()))
                     {
                         MessageFilter filter = entry.getValue();
-                        if(filter.matches(payload))
+                        if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties)))
                         {
                             queuesSet.add(entry.getKey());
                         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Wed Jan 22 21:43:46 2014
@@ -24,7 +24,8 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -98,7 +99,7 @@ public interface Exchange extends Exchan
      *
      * @return list of queues to which to route the message.
      */
-    List<? extends BaseQueue> route(InboundMessage message);
+    List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties);
 
 
     /**

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Wed Jan 22 21:43:46 2014
@@ -29,8 +29,11 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -66,7 +69,8 @@ public class FanoutExchange extends Abst
         super(TYPE);
     }
 
-    public ArrayList<BaseQueue> doRoute(InboundMessage payload)
+    @Override
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
     {
 
         for(Binding b : getBindings())
@@ -87,7 +91,7 @@ public class FanoutExchange extends Abst
                 {
                     for(MessageFilter filter : bindingMessageFilterMap.values())
                     {
-                        if(filter.matches(payload))
+                        if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
                         {
                             result.add(q);
                             break;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Wed Jan 22 21:43:46 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.AMQMessageHeader;
 
@@ -31,8 +32,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.server.filter.Filterable;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -135,7 +135,7 @@ class HeadersBinding
         }
     }
 
-    public boolean matches(InboundMessage message)
+    public boolean matches(Filterable message)
     {
         return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Jan 22 21:43:46 2014
@@ -23,7 +23,9 @@ package org.apache.qpid.server.exchange;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -81,8 +83,8 @@ public class HeadersExchange extends Abs
         super(TYPE);
     }
 
-
-    public ArrayList<BaseQueue> doRoute(InboundMessage payload)
+    @Override
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
     {
         if (_logger.isDebugEnabled())
         {
@@ -93,7 +95,7 @@ public class HeadersExchange extends Abs
 
         for (HeadersBinding hb : _bindingHeaderMatchers)
         {
-            if (hb.matches(payload))
+            if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
             {
                 Binding b = hb.getBinding();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Jan 22 21:43:46 2014
@@ -34,7 +34,10 @@ import org.apache.qpid.server.exchange.t
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
 import org.apache.qpid.server.exchange.topic.TopicNormalizer;
 import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -144,15 +147,16 @@ public class TopicExchange extends Abstr
 
     }
 
-
-    public ArrayList<BaseQueue> doRoute(InboundMessage payload)
+    @Override
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
     {
 
         final String routingKey = payload.getRoutingKey() == null
                                           ? ""
                                           : payload.getRoutingKey();
 
-        final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
+        final Collection<AMQQueue> matchedQueues =
+                getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
 
         ArrayList<BaseQueue> queues;
 
@@ -209,7 +213,7 @@ public class TopicExchange extends Abstr
         }
     }
 
-    private Collection<AMQQueue> getMatchedQueues(InboundMessage message, String routingKey)
+    private Collection<AMQQueue> getMatchedQueues(Filterable message, String routingKey)
     {
 
         Collection<TopicMatcherResult> results = _parser.parse(routingKey);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Wed Jan 22 21:43:46 2014
@@ -21,8 +21,8 @@
 package org.apache.qpid.server.exchange.topic;
 
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import java.util.ArrayList;
@@ -168,7 +168,7 @@ public final class TopicExchangeResult i
         _filteredQueues.put(queue,newFilters);
     }
 
-    public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
+    public Collection<AMQQueue> processMessage(Filterable msg, Collection<AMQQueue> queues)
     {
         if(queues == null)
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java Wed Jan 22 21:43:46 2014
@@ -23,8 +23,6 @@ package org.apache.qpid.server.filter;
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
 
-import org.apache.qpid.server.queue.Filterable;
-
 public interface FilterManager
 {
     void add(MessageFilter filter);

Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (from r1560435, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java&r1=1560435&r2=1560524&rev=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Wed Jan 22 21:43:46 2014
@@ -19,7 +19,7 @@
  *
  */
 
-package org.apache.qpid.server.exchange;
+package org.apache.qpid.server.filter;
 
 import java.lang.ref.WeakReference;
 import java.util.Collections;
@@ -30,12 +30,8 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.Filterable;
 
 public class FilterSupport
 {
@@ -104,7 +100,7 @@ public class FilterSupport
                        && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
     }
 
-    static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+    public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
     {
         if(argumentsContainNoLocal(args))
         {
@@ -133,9 +129,9 @@ public class FilterSupport
 
         public boolean matches(Filterable message)
         {
-            InboundMessage inbound = (InboundMessage) message;
             final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
-            return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+            return exclusiveOwningSession == null ||
+                    exclusiveOwningSession.getConnectionReference() != message.getConnectionReference();
 
         }
 

Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (from r1560435, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java&r1=1560435&r2=1560524&rev=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Wed Jan 22 21:43:46 2014
@@ -18,9 +18,13 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.filter;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 
 public interface Filterable
 {
@@ -29,4 +33,40 @@ public interface Filterable
     boolean isPersistent();
 
     boolean isRedelivered();
+
+    Object getConnectionReference();
+
+    public class Factory
+    {
+
+        public static Filterable newInstance(final ServerMessage message, final InstanceProperties properties)
+        {
+            return new Filterable()
+            {
+                @Override
+                public AMQMessageHeader getMessageHeader()
+                {
+                    return message.getMessageHeader();
+                }
+
+                @Override
+                public boolean isPersistent()
+                {
+                    return Boolean.TRUE.equals(properties.getProperty(InstanceProperties.Property.PERSISTENT));
+                }
+
+                @Override
+                public boolean isRedelivered()
+                {
+                    return Boolean.TRUE.equals(properties.getProperty(InstanceProperties.Property.REDELIVERED));
+                }
+
+                @Override
+                public Object getConnectionReference()
+                {
+                    return message.getConnectionReference();
+                }
+            };
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Wed Jan 22 21:43:46 2014
@@ -31,7 +31,6 @@ import org.apache.qpid.filter.SelectorPa
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.SelectorParser;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.queue.Filterable;
 
 
 public class JMSSelectorFilter implements MessageFilter

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Wed Jan 22 21:43:46 2014
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.filter;
 
-import org.apache.qpid.server.queue.Filterable;
-
 public interface MessageFilter
 {
     boolean matches(Filterable message);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Wed Jan 22 21:43:46 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.filter;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.Filterable;
 
 public class NoConsumerFilter implements MessageFilter
 {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Wed Jan 22 21:43:46 2014
@@ -22,8 +22,6 @@ package org.apache.qpid.server.filter;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.server.queue.Filterable;
-
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class SimpleFilterManager implements FilterManager

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java?rev=1560524&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java Wed Jan 22 21:43:46 2014
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public interface InstanceProperties
+{
+
+    enum Property {
+        REDELIVERED,
+        PERSISTENT,
+        MANDATORY,
+        IMMEDIATE,
+        EXPIRATION
+    }
+
+    public Object getProperty(Property prop);
+
+    InstanceProperties EMPTY = new InstanceProperties()
+        {
+            @Override
+            public Object getProperty(final Property prop)
+            {
+                return null;
+            }
+        };
+
+    class Factory
+    {
+        public static InstanceProperties fromMap(Map<Property, Object> map)
+        {
+            final Map<Property,Object> props = new EnumMap<Property,Object>(map);
+            return new InstanceProperties()
+            {
+                @Override
+                public Object getProperty(final Property prop)
+                {
+                    return props.get(prop);
+                }
+            };
+        }
+
+        public static Map<Property, Object> asMap(InstanceProperties props)
+        {
+            EnumMap<Property, Object> map = new EnumMap<Property,Object>(Property.class);
+
+            for(Property prop : Property.values())
+            {
+                Object value = props.getProperty(prop);
+                if(value != null)
+                {
+                    map.put(prop,value);
+                }
+            }
+
+            return map;
+        }
+    }
+}

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Wed Jan 22 21:43:46 2014
@@ -49,4 +49,5 @@ public interface ServerMessage<T extends
 
     public ByteBuffer getContent(int offset, int size);
 
+    Object getConnectionReference();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Wed Jan 22 21:43:46 2014
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSk
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 
@@ -76,7 +75,7 @@ public interface AMQSessionModel extends
 
     boolean getBlocking();
 
-    boolean onSameConnection(InboundMessage inbound);
+    Object getConnectionReference();
 
     int getUnacknowledgedMessageCount();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Jan 22 21:43:46 2014
@@ -202,7 +202,7 @@ public class ConflationQueueList extends
             {
                 if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
                 {
-                    Object key = getMessageHeader().getHeader(_conflationKey);
+                    Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
                     _latestValuesMap.remove(key,_latestValueReference);
                 }
                 return true;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Wed Jan 22 21:43:46 2014
@@ -21,10 +21,11 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 
-public interface QueueEntry extends Comparable<QueueEntry>, Filterable
+public interface QueueEntry extends Comparable<QueueEntry>
 {
 
 
@@ -250,4 +251,6 @@ public interface QueueEntry extends Comp
 
     void decrementDeliveryCount();
 
+    Filterable asFilterable();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Jan 22 21:43:46 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -174,7 +175,7 @@ public abstract class QueueEntryImpl imp
 
     private boolean acquire(final EntryState state)
     {
-        boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+        boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
 
         if(acquired && _stateChangeListeners != null)
         {
@@ -246,18 +247,6 @@ public abstract class QueueEntryImpl imp
         _deliveryState |= REDELIVERED;
     }
 
-    public AMQMessageHeader getMessageHeader()
-    {
-        final ServerMessage message = getMessage();
-        return message == null ? null : message.getMessageHeader();
-    }
-
-    public boolean isPersistent()
-    {
-        final ServerMessage message = getMessage();
-        return message != null && message.isPersistent();
-    }
-
     public boolean isRedelivered()
     {
         return (_deliveryState & REDELIVERED) != 0;
@@ -366,12 +355,12 @@ public abstract class QueueEntryImpl imp
 
         if (alternateExchange != null)
         {
-            InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
-            List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
+            QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this);
+            List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props);
             final ServerMessage message = getMessage();
             if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
             {
-                queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+                queues = alternateExchange.getAlternateExchange().route(getMessage(), props);
             }
 
 
@@ -507,6 +496,12 @@ public abstract class QueueEntryImpl imp
         _deliveryCountUpdater.decrementAndGet(this);
     }
 
+    @Override
+    public Filterable asFilterable()
+    {
+        return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this));
+    }
+
     public String toString()
     {
         return "QueueEntryImpl{" +

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java?rev=1560524&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java Wed Jan 22 21:43:46 2014
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.InstanceProperties;
+
+public class QueueEntryInstanceProperties implements InstanceProperties
+{
+    private final QueueEntry _entry;
+
+    public QueueEntryInstanceProperties(final QueueEntry entry)
+    {
+        _entry = entry;
+    }
+
+    @Override
+    public Object getProperty(final Property prop)
+    {
+        switch(prop)
+        {
+            case REDELIVERED:
+                return _entry.isRedelivered();
+            case MANDATORY:
+                return false;
+            case PERSISTENT:
+                return _entry.getMessage().isPersistent();
+            case IMMEDIATE:
+                return false;
+            case EXPIRATION:
+                return _entry.getMessage().getExpiration();
+        }
+        return null;
+    }
+}

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Jan 22 21:43:46 2014
@@ -1358,14 +1358,14 @@ public class SimpleAMQQueue implements A
             if(_alternateExchange != null)
             {
 
-                InboundMessageAdapter adapter = new InboundMessageAdapter();
                 for(final QueueEntry entry : entries)
                 {
-                    adapter.setEntry(entry);
-                    List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+
+                    QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry);
+                    List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props);
                     if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
                     {
-                        queues = _alternateExchange.getAlternateExchange().route(adapter);
+                        queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props);
                     }
 
                     final ServerMessage message = entry.getMessage();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Wed Jan 22 21:43:46 2014
@@ -25,7 +25,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Wed Jan 22 21:43:46 2014
@@ -40,7 +40,8 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityManager;
@@ -128,7 +129,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key",queue2, null);
 
 
-        List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+        List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -137,7 +138,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
 
 
-        result = _exchange.route(mockMessage(true));
+        result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -145,14 +146,14 @@ public class FanoutExchangeTest extends 
 
         _exchange.removeBinding("key",queue2,null);
 
-        result = _exchange.route(mockMessage(true));
+        result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
 
 
-        result = _exchange.route(mockMessage(false));
+        result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to queue1 only", 1, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -161,7 +162,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
 
 
-        result = _exchange.route(mockMessage(false));
+        result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
@@ -169,7 +170,7 @@ public class FanoutExchangeTest extends 
 
     }
 
-    private InboundMessage mockMessage(boolean val)
+    private ServerMessage mockMessage(boolean val)
     {
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
         when(header.containsHeader("select")).thenReturn(true);
@@ -185,8 +186,8 @@ public class FanoutExchangeTest extends 
 
             }
         });
-        final InboundMessage inboundMessage = mock(InboundMessage.class);
-        when(inboundMessage.getMessageHeader()).thenReturn(header);
-        return inboundMessage;
+        final ServerMessage serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getMessageHeader()).thenReturn(header);
+        return serverMessage;
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Wed Jan 22 21:43:46 2014
@@ -35,7 +35,8 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityManager;
@@ -71,9 +72,9 @@ public class HeadersExchangeTest extends
 
     }
 
-    protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
+    protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
     {
-        List<? extends BaseQueue> results = _exchange.route(msg);
+        List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY);
         List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
         unexpected.removeAll(Arrays.asList(expected));
         assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
@@ -209,7 +210,7 @@ public class HeadersExchangeTest extends
 
     }
 
-    private InboundMessage mockMessage(final Map<String, Object> headerValues)
+    private ServerMessage mockMessage(final Map<String, Object> headerValues)
     {
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
         when(header.containsHeader(anyString())).then(new Answer<Boolean>()
@@ -239,9 +240,9 @@ public class HeadersExchangeTest extends
 
             }
         });
-        final InboundMessage inboundMessage = mock(InboundMessage.class);
-        when(inboundMessage.getMessageHeader()).thenReturn(header);
-        return inboundMessage;
+        final ServerMessage serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getMessageHeader()).thenReturn(header);
+        return serverMessage;
     }
 
     public static junit.framework.Test suite()

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Wed Jan 22 21:43:46 2014
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -312,9 +312,9 @@ public class TopicExchangeTest extends Q
 
     private int routeMessage(String routingKey, long messageNumber) throws AMQException
     {
-        InboundMessage inboundMessage = mock(InboundMessage.class);
-        when(inboundMessage.getRoutingKey()).thenReturn(routingKey);
-        List<? extends BaseQueue> queues = _exchange.route(inboundMessage);
+        ServerMessage serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getRoutingKey()).thenReturn(routingKey);
+        List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY);
         ServerMessage message = mock(ServerMessage.class);
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Wed Jan 22 21:43:46 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
@@ -201,7 +202,6 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-
     public int compareTo(QueueEntry o)
     {
 
@@ -249,5 +249,9 @@ public class MockQueueEntry implements Q
     {
     }
 
-
+    @Override
+    public Filterable asFilterable()
+    {
+        return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this));
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Wed Jan 22 21:43:46 2014
@@ -95,6 +95,12 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
+        public Object getConnectionReference()
+        {
+            return null;
+        }
+
+        @Override
         public long getExpiration()
         {
             return 0;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Wed Jan 22 21:43:46 2014
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -352,9 +351,9 @@ public class MockSubscription implements
         }
 
         @Override
-        public boolean onSameConnection(InboundMessage inbound)
+        public Object getConnectionReference()
         {
-            return false;
+            return this;
         }
 
         @Override

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Wed Jan 22 21:43:46 2014
@@ -98,6 +98,12 @@ class MockServerMessage implements Serve
         throw new NotImplementedException();
     }
 
+    @Override
+    public Object getConnectionReference()
+    {
+        return null;
+    }
+
     public long getArrivalTime()
     {
         throw new NotImplementedException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Wed Jan 22 21:43:46 2014
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.transport.DeliveryProperties;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Wed Jan 22 21:43:46 2014
@@ -22,15 +22,13 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.transport.Header;
 
 import java.nio.ByteBuffer;
 
 
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
 {
 
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Jan 22 21:43:46 2014
@@ -53,9 +53,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -766,12 +764,12 @@ public class ServerSession extends Sessi
         }
     }
 
-    public boolean onSameConnection(InboundMessage inbound)
+    @Override
+    public Object getConnectionReference()
     {
-        return inbound.getConnectionReference() == getConnection().getReference();
+        return getConnection().getReference();
     }
 
-
     public String toLogString()
     {
         long connectionId = super.getConnection() instanceof ServerConnection

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Jan 22 21:43:46 2014
@@ -33,7 +33,8 @@ import org.apache.qpid.server.exchange.H
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
@@ -290,9 +291,8 @@ public class ServerSessionDelegate exten
     {
         final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
-        DeliveryProperties delvProps = null;
-        if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
-                .hasExpiration())
+        final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+        if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
         {
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
@@ -312,13 +312,36 @@ public class ServerSessionDelegate exten
         final MessageStore store = getVirtualHost(ssn).getMessageStore();
         final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
         final ServerSession serverSession = (ServerSession) ssn;
-        MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+        final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
         MessageReference<MessageTransferMessage> reference = message.newReference();
-        List<? extends BaseQueue> queues = exchange.route(message);
+
+        final InstanceProperties instanceProperties = new InstanceProperties()
+        {
+            @Override
+            public Object getProperty(final Property prop)
+            {
+                switch(prop)
+                {
+                    case EXPIRATION:
+                        return message.getExpiration();
+                    case IMMEDIATE:
+                        return message.isImmediate();
+                    case MANDATORY:
+                        return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+                    case PERSISTENT:
+                        return message.isPersistent();
+                    case REDELIVERED:
+                        return delvProps.getRedelivered();
+                }
+                return null;
+            }
+        };
+
+        List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
         if(queues.isEmpty() && exchange.getAlternateExchange() != null)
         {
             final Exchange alternateExchange = exchange.getAlternateExchange();
-            queues = alternateExchange.route(message);
+            queues = alternateExchange.route(message, instanceProperties);
             if (!queues.isEmpty())
             {
                 exchangeInUse = alternateExchange;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Wed Jan 22 21:43:46 2014
@@ -33,13 +33,11 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -230,7 +228,7 @@ public class Subscription_0_10 implement
 
     private boolean checkFilters(QueueEntry entry)
     {
-        return (_filters == null) || _filters.allAllow(entry);
+        return (_filters == null) || _filters.allAllow(entry.asFilterable());
     }
 
     public boolean isClosed()
@@ -583,9 +581,7 @@ public class Subscription_0_10 implement
         final ServerMessage msg = entry.getMessage();
         if (alternateExchange != null)
         {
-            final InboundMessage m = new InboundMessageAdapter(entry);
-
-            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Jan 22 21:43:46 2014
@@ -65,7 +65,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -73,8 +73,8 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -331,7 +331,31 @@ public class AMQChannel implements AMQSe
                     }
                     else
                     {
-                        final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage);
+                        final InstanceProperties instanceProperties =
+                                new InstanceProperties()
+                                {
+                                    @Override
+                                    public Object getProperty(final Property prop)
+                                    {
+                                        switch(prop)
+                                        {
+                                            case EXPIRATION:
+                                                return amqMessage.getExpiration();
+                                            case IMMEDIATE:
+                                                return _currentMessage.getMessagePublishInfo().isImmediate();
+                                            case PERSISTENT:
+                                                return amqMessage.isPersistent();
+                                            case MANDATORY:
+                                                return _currentMessage.getMessagePublishInfo().isMandatory();
+                                            case REDELIVERED:
+                                                return false;
+                                        }
+                                        return null;
+                                    }
+                                };
+
+                        final List<? extends BaseQueue> destinationQueues =
+                            _currentMessage.getExchange().route(amqMessage, instanceProperties);
 
                         if(destinationQueues == null || destinationQueues.isEmpty())
                         {
@@ -1472,9 +1496,10 @@ public class AMQChannel implements AMQSe
         }
     }
 
-    public boolean onSameConnection(InboundMessage inbound)
+    @Override
+    public Object getConnectionReference()
     {
-        return getProtocolSession().getReference() == inbound.getConnectionReference();
+        return getProtocolSession().getReference();
     }
 
     public int getUnacknowledgedMessageCount()
@@ -1550,9 +1575,9 @@ public class AMQChannel implements AMQSe
                 return;
             }
 
-            final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
 
-            final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
+            final List<? extends BaseQueue> destinationQueues =
+                    altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Wed Jan 22 21:43:46 2014
@@ -22,15 +22,11 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
 
 import java.nio.ByteBuffer;
@@ -38,7 +34,7 @@ import java.nio.ByteBuffer;
 /**
  * A deliverable message.
  */
-public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage
+public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData>
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -94,12 +90,6 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getMessageHeader();
     }
 
-    @Override
-    public boolean isRedelivered()
-    {
-        return false;
-    }
-
     public MessagePublishInfo getMessagePublishInfo()
     {
         return getMessageMetaData().getMessagePublishInfo();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Wed Jan 22 21:43:46 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -519,7 +520,7 @@ public abstract class SubscriptionImpl i
 
     private boolean checkFilters(QueueEntry msg)
     {
-        return (_filters == null) || _filters.allAllow(msg);
+        return (_filters == null) || _filters.allAllow(msg.asFilterable());
     }
 
     public boolean isAutoClose()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Wed Jan 22 21:43:46 2014
@@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -54,14 +55,37 @@ public class ExchangeDestination impleme
 
     public Outcome send(final Message_1_0 message, ServerTransaction txn)
     {
-        List<? extends BaseQueue> queues = _exchange.route(message);
+        final InstanceProperties instanceProperties =
+            new InstanceProperties()
+            {
+
+                @Override
+                public Object getProperty(final Property prop)
+                {
+                    switch(prop)
+                    {
+                        case MANDATORY:
+                            return false;
+                        case REDELIVERED:
+                            return false;
+                        case PERSISTENT:
+                            return message.isPersistent();
+                        case IMMEDIATE:
+                            return false;
+                        case EXPIRATION:
+                            return message.getExpiration();
+                    }
+                    return null;
+                }};
+
+        List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
 
         if(queues == null || queues.isEmpty())
         {
             Exchange altExchange = _exchange.getAlternateExchange();
             if(altExchange != null)
             {
-                queues = altExchange.route(message);
+                queues = altExchange.route(message, instanceProperties);
             }
         }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Jan 22 21:43:46 2014
@@ -25,10 +25,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.store.StoredMessage;
 
-public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
 
     private List<ByteBuffer> _fragments;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Jan 22 21:43:46 2014
@@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityExcept
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -539,9 +538,9 @@ public class Session_1_0 implements Sess
     }
 
     @Override
-    public boolean onSameConnection(InboundMessage inbound)
+    public Object getConnectionReference()
     {
-        return inbound.getConnectionReference() == getConnection().getReference();
+        return getConnection().getReference();
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Wed Jan 22 21:43:46 2014
@@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
 
-class Subscription_1_0 implements Subscription
+class
+        Subscription_1_0 implements Subscription
 {
     private SendingLink_1_0 _link;
 
@@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscr
 
     private boolean checkFilters(final QueueEntry entry)
     {
-        return (_filters == null) || _filters.allAllow(entry);
+        return (_filters == null) || _filters.allAllow(entry.asFilterable());
     }
 
     public boolean isClosed()

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1560524&r1=1560523&r2=1560524&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Jan 22 21:43:46 2014
@@ -40,6 +40,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
@@ -623,7 +624,7 @@ public class MessageStoreTest extends Qp
         storedMessage.flushToStore();
         final AMQMessage currentMessage = new AMQMessage(storedMessage);
 
-        final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage);
+        final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY);
 
 
         ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message