qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1560634 [4/7] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf2/examples/cpp/ qpid/cpp/bindings/qpid/dotnet/src/ qpid/cpp/bindings/qpid/dotnet/src/msvc10/ qpid/cpp/bindings/qpid/dotnet/src/msvc9/ q...
Date Thu, 23 Jan 2014 11:01:08 GMT
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Jan 23 11:01:02 2014
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSk
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 
@@ -76,7 +75,7 @@ public interface AMQSessionModel extends
 
     boolean getBlocking();
 
-    boolean onSameConnection(InboundMessage inbound);
+    Object getConnectionReference();
 
     int getUnacknowledgedMessageCount();
 

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1558037-1560619

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Thu Jan 23 11:01:02 2014
@@ -202,7 +202,7 @@ public class ConflationQueueList extends
             {
                 if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
                 {
-                    Object key = getMessageHeader().getHeader(_conflationKey);
+                    Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
                     _latestValuesMap.remove(key,_latestValueReference);
                 }
                 return true;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Jan 23 11:01:02 2014
@@ -21,10 +21,11 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 
-public interface QueueEntry extends Comparable<QueueEntry>, Filterable
+public interface QueueEntry extends Comparable<QueueEntry>
 {
 
 
@@ -250,4 +251,6 @@ public interface QueueEntry extends Comp
 
     void decrementDeliveryCount();
 
+    Filterable asFilterable();
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Jan 23 11:01:02 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -174,7 +175,7 @@ public abstract class QueueEntryImpl imp
 
     private boolean acquire(final EntryState state)
     {
-        boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+        boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
 
         if(acquired && _stateChangeListeners != null)
         {
@@ -246,18 +247,6 @@ public abstract class QueueEntryImpl imp
         _deliveryState |= REDELIVERED;
     }
 
-    public AMQMessageHeader getMessageHeader()
-    {
-        final ServerMessage message = getMessage();
-        return message == null ? null : message.getMessageHeader();
-    }
-
-    public boolean isPersistent()
-    {
-        final ServerMessage message = getMessage();
-        return message != null && message.isPersistent();
-    }
-
     public boolean isRedelivered()
     {
         return (_deliveryState & REDELIVERED) != 0;
@@ -366,12 +355,12 @@ public abstract class QueueEntryImpl imp
 
         if (alternateExchange != null)
         {
-            InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
-            List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
+            QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this);
+            List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props);
             final ServerMessage message = getMessage();
             if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
             {
-                queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+                queues = alternateExchange.getAlternateExchange().route(getMessage(), props);
             }
 
 
@@ -507,6 +496,12 @@ public abstract class QueueEntryImpl imp
         _deliveryCountUpdater.decrementAndGet(this);
     }
 
+    @Override
+    public Filterable asFilterable()
+    {
+        return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this));
+    }
+
     public String toString()
     {
         return "QueueEntryImpl{" +

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Jan 23 11:01:02 2014
@@ -1358,14 +1358,14 @@ public class SimpleAMQQueue implements A
             if(_alternateExchange != null)
             {
 
-                InboundMessageAdapter adapter = new InboundMessageAdapter();
                 for(final QueueEntry entry : entries)
                 {
-                    adapter.setEntry(entry);
-                    List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+
+                    QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry);
+                    List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props);
                     if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
                     {
-                        queues = _alternateExchange.getAlternateExchange().route(adapter);
+                        queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props);
                     }
 
                     final ServerMessage message = entry.getMessage();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Jan 23 11:01:02 2014
@@ -54,6 +54,7 @@ import org.apache.qpid.server.stats.Stat
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.util.SystemUtils;
 
 
 /**
@@ -339,9 +340,9 @@ public class ApplicationRegistry impleme
 
         logActor.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"),
                                                  System.getProperty("java.runtime.version", System.getProperty("java.version")),
-                                                 System.getProperty("os.name"),
-                                                 System.getProperty("os.version"),
-                                                 System.getProperty("os.arch")));
+                                                 SystemUtils.getOSName(),
+                                                 SystemUtils.getOSVersion(),
+                                                 SystemUtils.getOSArch()));
 
         logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java Thu Jan 23 11:01:02 2014
@@ -139,26 +139,32 @@ public class JsonFileConfigStore impleme
         return file.exists();
     }
 
-    private void getFileLock() throws IOException, AMQStoreException
+    private void getFileLock() throws AMQStoreException
     {
         File lockFile = new File(_directoryName, _name + ".lck");
-        lockFile.createNewFile();
-
-        FileOutputStream out = new FileOutputStream(lockFile);
-        FileChannel channel = out.getChannel();
         try
         {
+            lockFile.createNewFile();
+            lockFile.deleteOnExit();
+
+            @SuppressWarnings("resource")
+            FileOutputStream out = new FileOutputStream(lockFile);
+            FileChannel channel = out.getChannel();
             _fileLock = channel.tryLock();
         }
+        catch (IOException ioe)
+        {
+            throw new AMQStoreException("Cannot create the lock file " + lockFile.getName(), ioe);
+        }
         catch(OverlappingFileLockException e)
         {
             _fileLock = null;
         }
+
         if(_fileLock == null)
         {
-            throw new AMQStoreException("Cannot get lock on file " + lockFile.getAbsolutePath() + " is another instance running?");
+            throw new AMQStoreException("Cannot get lock on file " + lockFile.getAbsolutePath() + ". Is another instance running?");
         }
-        lockFile.deleteOnExit();
     }
 
     private void checkDirectoryIsWritable(String directoryName) throws AMQStoreException
@@ -190,7 +196,7 @@ public class JsonFileConfigStore impleme
         Map data = _objectMapper.readValue(new File(_directoryName,_configFileName),Map.class);
         Collection<Class<? extends ConfiguredObject>> childClasses =
                 MODEL.getChildTypes(VirtualHost.class);
-        String modelVersion = (String) data.remove("modelVersion");
+        data.remove("modelVersion");
         Object configVersion;
         if((configVersion = data.remove("configVersion")) instanceof Integer)
         {

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost:r1558037-1560619

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Thu Jan 23 11:01:02 2014
@@ -25,7 +25,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/plugins/AbstractConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/plugins/AbstractConfigurationTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/plugins/AbstractConfigurationTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/plugins/AbstractConfigurationTest.java Thu Jan 23 11:01:02 2014
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 import java.util.List;
@@ -34,6 +35,8 @@ import java.util.List;
  */
 public class AbstractConfigurationTest extends QpidTestCase
 {
+    private static final Logger _logger = Logger.getLogger(AbstractConfigurationTest.class);
+
     private static final double DOUBLE = 3.14;
     private static final long POSITIVE_LONG = 1000;
     private static final long NEGATIVE_LONG = -1000;
@@ -101,7 +104,7 @@ public class AbstractConfigurationTest e
         }
         catch (ConfigurationException e)
         {
-            e.printStackTrace();
+            _logger.error("Error setting up plugin configuration", e);
             fail(e.toString());
         }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Thu Jan 23 11:01:02 2014
@@ -40,7 +40,8 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityManager;
@@ -128,7 +129,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key",queue2, null);
 
 
-        List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+        List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -137,7 +138,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
 
 
-        result = _exchange.route(mockMessage(true));
+        result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -145,14 +146,14 @@ public class FanoutExchangeTest extends 
 
         _exchange.removeBinding("key",queue2,null);
 
-        result = _exchange.route(mockMessage(true));
+        result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
 
 
-        result = _exchange.route(mockMessage(false));
+        result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to queue1 only", 1, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -161,7 +162,7 @@ public class FanoutExchangeTest extends 
         _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
 
 
-        result = _exchange.route(mockMessage(false));
+        result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
         assertEquals("Expected message to be routed to both queues", 2, result.size());
         assertTrue("Expected queue1 to be routed to", result.contains(queue1));
         assertTrue("Expected queue2 to be routed to", result.contains(queue2));
@@ -169,7 +170,7 @@ public class FanoutExchangeTest extends 
 
     }
 
-    private InboundMessage mockMessage(boolean val)
+    private ServerMessage mockMessage(boolean val)
     {
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
         when(header.containsHeader("select")).thenReturn(true);
@@ -185,8 +186,8 @@ public class FanoutExchangeTest extends 
 
             }
         });
-        final InboundMessage inboundMessage = mock(InboundMessage.class);
-        when(inboundMessage.getMessageHeader()).thenReturn(header);
-        return inboundMessage;
+        final ServerMessage serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getMessageHeader()).thenReturn(header);
+        return serverMessage;
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Thu Jan 23 11:01:02 2014
@@ -96,16 +96,6 @@ public class HeadersBindingTest extends 
             return null;
         }
 
-        public String getReplyToExchange()
-        {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-        public String getReplyToRoutingKey()
-        {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
         public Object getHeader(String name)
         {
             return _headers.get(name);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Thu Jan 23 11:01:02 2014
@@ -35,7 +35,8 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityManager;
@@ -71,9 +72,9 @@ public class HeadersExchangeTest extends
 
     }
 
-    protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
+    protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
     {
-        List<? extends BaseQueue> results = _exchange.route(msg);
+        List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY);
         List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
         unexpected.removeAll(Arrays.asList(expected));
         assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
@@ -209,7 +210,7 @@ public class HeadersExchangeTest extends
 
     }
 
-    private InboundMessage mockMessage(final Map<String, Object> headerValues)
+    private ServerMessage mockMessage(final Map<String, Object> headerValues)
     {
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
         when(header.containsHeader(anyString())).then(new Answer<Boolean>()
@@ -239,9 +240,9 @@ public class HeadersExchangeTest extends
 
             }
         });
-        final InboundMessage inboundMessage = mock(InboundMessage.class);
-        when(inboundMessage.getMessageHeader()).thenReturn(header);
-        return inboundMessage;
+        final ServerMessage serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getMessageHeader()).thenReturn(header);
+        return serverMessage;
     }
 
     public static junit.framework.Test suite()

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Thu Jan 23 11:01:02 2014
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -312,9 +312,9 @@ public class TopicExchangeTest extends Q
 
     private int routeMessage(String routingKey, long messageNumber) throws AMQException
     {
-        InboundMessage inboundMessage = mock(InboundMessage.class);
-        when(inboundMessage.getRoutingKey()).thenReturn(routingKey);
-        List<? extends BaseQueue> queues = _exchange.route(inboundMessage);
+        ServerMessage serverMessage = mock(ServerMessage.class);
+        when(serverMessage.getRoutingKey()).thenReturn(routingKey);
+        List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY);
         ServerMessage message = mock(ServerMessage.class);
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/log4j/LoggingManagementFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/log4j/LoggingManagementFacadeTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/log4j/LoggingManagementFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/log4j/LoggingManagementFacadeTest.java Thu Jan 23 11:01:02 2014
@@ -19,14 +19,11 @@
  */
 package org.apache.qpid.server.logging.log4j;
 
-import java.io.File;
-import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Level;
 import org.apache.qpid.test.utils.TestFileUtils;
-import org.apache.qpid.util.FileUtils;
 
 import junit.framework.TestCase;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java Thu Jan 23 11:01:02 2014
@@ -382,7 +382,6 @@ public class PortFactoryTest extends Qpi
         }
         catch(IllegalConfigurationException e)
         {
-            e.printStackTrace();
             // pass
         }
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Thu Jan 23 11:01:02 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
@@ -201,7 +202,6 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-
     public int compareTo(QueueEntry o)
     {
 
@@ -249,5 +249,9 @@ public class MockQueueEntry implements Q
     {
     }
 
-
+    @Override
+    public Filterable asFilterable()
+    {
+        return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this));
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Jan 23 11:01:02 2014
@@ -30,6 +30,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
 import java.util.Map;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
@@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit;
 
 public class SimpleAMQQueueTest extends QpidTestCase
 {
+    private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTest.class);
 
     private SimpleAMQQueue _queue;
     private VirtualHost _virtualHost;
@@ -1146,7 +1149,7 @@ public class SimpleAMQQueueTest extends 
         }
         catch (InterruptedException e)
         {
-            e.printStackTrace();
+            _logger.error("Thread interrupted", e);
         }
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Thu Jan 23 11:01:02 2014
@@ -95,6 +95,12 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
+        public Object getConnectionReference()
+        {
+            return null;
+        }
+
+        @Override
         public long getExpiration()
         {
             return 0;
@@ -130,11 +136,6 @@ public class TestMessageMetaDataType imp
             return _storedMsg;
         }
 
-        @Override
-        public boolean isImmediate()
-        {
-            return false;
-        }
 
         @Override
         public boolean isPersistent()

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Thu Jan 23 11:01:02 2014
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -352,9 +351,9 @@ public class MockSubscription implements
         }
 
         @Override
-        public boolean onSameConnection(InboundMessage inbound)
+        public Object getConnectionReference()
         {
-            return false;
+            return this;
         }
 
         @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Thu Jan 23 11:01:02 2014
@@ -98,6 +98,12 @@ class MockServerMessage implements Serve
         throw new NotImplementedException();
     }
 
+    @Override
+    public Object getConnectionReference()
+    {
+        return null;
+    }
+
     public long getArrivalTime()
     {
         throw new NotImplementedException();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/access-control/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/access-control/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/access-control/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/access-control/pom.xml Thu Jan 23 11:01:02 2014
@@ -38,6 +38,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/pom.xml Thu Jan 23 11:01:02 2014
@@ -34,6 +34,20 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Thu Jan 23 11:01:02 2014
@@ -113,7 +113,6 @@ public class MessageConverter_v0_10 impl
 
 
         deliveryProps.setExpiration(serverMsg.getExpiration());
-        deliveryProps.setImmediate(serverMsg.isImmediate());
         deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
         deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
         deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Thu Jan 23 11:01:02 2014
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -37,7 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
+public class MessageMetaData_0_10 implements StorableMessageMetaData
 {
     private Header _header;
     private DeliveryProperties _deliveryProps;
@@ -53,8 +52,6 @@ public class MessageMetaData_0_10 implem
     private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
 
     private volatile ByteBuffer _encoded;
-    private Object _connectionReference;
-
 
     public MessageMetaData_0_10(MessageTransfer xfr)
     {
@@ -202,12 +199,6 @@ public class MessageMetaData_0_10 implem
         return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
     }
 
-    public boolean isRedelivered()
-    {
-        // The *Message* is never redelivered, only queue entries are...
-        return false;
-    }
-
     public long getArrivalTime()
     {
         return _arrivalTime;
@@ -218,16 +209,6 @@ public class MessageMetaData_0_10 implem
         return _header;
     }
 
-    public void setConnectionReference(Object connectionReference)
-    {
-        _connectionReference = connectionReference;
-    }
-
-    public Object getConnectionReference()
-    {
-        return _connectionReference;
-    }
-
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
         public MessageMetaData_0_10 createMetaData(ByteBuffer buf)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Thu Jan 23 11:01:02 2014
@@ -22,23 +22,18 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.transport.Header;
 
 import java.nio.ByteBuffer;
 
 
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
 {
 
-    private Object _connectionRef;
-
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
     {
-        super(storeMessage);
-        _connectionRef = connectionRef;
+        super(storeMessage, connectionRef);
     }
 
     private MessageMetaData_0_10 getMetaData()
@@ -56,12 +51,6 @@ public class MessageTransferMessage exte
         return getMetaData().getMessageHeader();
     }
 
-    public boolean isPersistent()
-    {
-        return getMetaData().isPersistent();
-    }
-
-
     public boolean isRedelivered()
     {
         // The *Message* is never redelivered, only queue entries are... this is here so that filters
@@ -71,7 +60,6 @@ public class MessageTransferMessage exte
 
     public long getSize()
     {
-
         return getMetaData().getSize();
     }
 
@@ -85,32 +73,11 @@ public class MessageTransferMessage exte
         return getMetaData().getExpiration();
     }
 
-    public MessageReference newReference()
-    {
-        return new TransferMessageReference(this);
-    }
-
-    public long getMessageNumber()
-    {
-        return getStoredMessage().getMessageNumber();
-    }
-
     public long getArrivalTime()
     {
         return getMetaData().getArrivalTime();
     }
 
-    public int getContent(ByteBuffer buf, int offset)
-    {
-        return getStoredMessage().getContent(offset, buf);
-    }
-
-
-    public ByteBuffer getContent(int offset, int size)
-    {
-        return getStoredMessage().getContent(offset,size);
-    }
-
     public Header getHeader()
     {
         return getMetaData().getHeader();
@@ -118,13 +85,6 @@ public class MessageTransferMessage exte
 
     public ByteBuffer getBody()
     {
-
         return  getContent(0, (int)getSize());
     }
-
-    public Object getConnectionReference()
-    {
-        return _connectionRef;
-    }
-
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Thu Jan 23 11:01:02 2014
@@ -24,7 +24,6 @@ import org.apache.qpid.protocol.ServerPr
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
@@ -156,7 +155,8 @@ public class ProtocolEngine_0_10  extend
 
     public void readerIdle()
     {
-        //Todo
+        _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
+        _network.close();
     }
 
     public String getAddress()

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Jan 23 11:01:02 2014
@@ -73,7 +73,6 @@ public class ServerConnection extends Co
     private Port _port;
     private AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
-    private NetworkConnection _networkConnection;
     private Transport _transport;
     private volatile boolean _stopped;
 
@@ -528,7 +527,7 @@ public class ServerConnection extends Co
 
     public Principal getPeerPrincipal()
     {
-        return _networkConnection.getPeerPrincipal();
+        return getNetworkConnection().getPeerPrincipal();
     }
 
     @Override
@@ -543,16 +542,6 @@ public class ServerConnection extends Co
         super.setLocalAddress(localAddress);
     }
 
-    public void setNetworkConnection(NetworkConnection network)
-    {
-        _networkConnection = network;
-    }
-
-    public NetworkConnection getNetworkConnection()
-    {
-        return _networkConnection;
-    }
-
     public void doHeartbeat()
     {
         super.doHeartBeat();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Thu Jan 23 11:01:02 2014
@@ -236,7 +236,6 @@ public class ServerConnectionDelegate ex
         }
 
         final NetworkConnection networkConnection = sconn.getNetworkConnection();
-
         if(ok.hasHeartbeat())
         {
             int heartbeat = ok.getHeartbeat();
@@ -352,4 +351,11 @@ public class ServerConnectionDelegate ex
     {
         return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT);
     }
+
+    @Override
+    protected int getHeartbeatMax()
+    {
+        int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
+        return delay == 0 ? super.getHeartbeatMax() : delay;
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Jan 23 11:01:02 2014
@@ -53,9 +53,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -184,7 +182,7 @@ public class ServerSession extends Sessi
         return isCommandsFull(id);
     }
 
-    public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
+    public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
     {
         if(_outstandingCredit.get() != UNLIMITED_CREDIT
                 && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -766,15 +764,12 @@ public class ServerSession extends Sessi
         }
     }
 
-    public boolean onSameConnection(InboundMessage inbound)
+    @Override
+    public Object getConnectionReference()
     {
-        return ((inbound instanceof MessageTransferMessage)
-                && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference())
-                || ((inbound instanceof MessageMetaData_0_10)
-                    && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference());
+        return getConnection().getReference();
     }
 
-
     public String toLogString()
     {
         long connectionId = super.getConnection() instanceof ServerConnection
@@ -852,31 +847,25 @@ public class ServerSession extends Sessi
     private class PostEnqueueAction implements ServerTransaction.Action
     {
 
-        private List<? extends BaseQueue> _queues;
-        private ServerMessage _message;
+        private final MessageReference<MessageTransferMessage> _reference;
+        private final List<? extends BaseQueue> _queues;
         private final boolean _transactional;
 
-        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
+        public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional)
         {
+            _reference = message.newReference();
             _transactional = transactional;
-            setState(queues, message);
-        }
-
-        public void setState(List<? extends BaseQueue> queues, ServerMessage message)
-        {
-            _message = message;
             _queues = queues;
         }
 
         public void postCommit()
         {
-            MessageReference<?> ref = _message.newReference();
             for(int i = 0; i < _queues.size(); i++)
             {
                 try
                 {
                     BaseQueue queue = _queues.get(i);
-                    queue.enqueue(_message, _transactional, null);
+                    queue.enqueue(_reference.getMessage(), _transactional, null);
                     if(queue instanceof AMQQueue)
                     {
                         ((AMQQueue)queue).checkCapacity(ServerSession.this);
@@ -889,12 +878,13 @@ public class ServerSession extends Sessi
                     throw new RuntimeException(e);
                 }
             }
-            ref.release();
+            _reference.release();
         }
 
         public void onRollback()
         {
             // NO-OP
+            _reference.release();
         }
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Thu Jan 23 11:01:02 2014
@@ -33,6 +33,8 @@ import org.apache.qpid.server.exchange.H
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
@@ -289,15 +291,13 @@ public class ServerSessionDelegate exten
     {
         final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
-        DeliveryProperties delvProps = null;
-        if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
-                .hasExpiration())
+        final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+        if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
         {
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
 
         final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
-        messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
 
         if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
         {
@@ -309,11 +309,39 @@ public class ServerSessionDelegate exten
         }
 
         final Exchange exchangeInUse;
-        List<? extends BaseQueue> queues = exchange.route(messageMetaData);
+        final MessageStore store = getVirtualHost(ssn).getMessageStore();
+        final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
+        final ServerSession serverSession = (ServerSession) ssn;
+        final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+        MessageReference<MessageTransferMessage> reference = message.newReference();
+
+        final InstanceProperties instanceProperties = new InstanceProperties()
+        {
+            @Override
+            public Object getProperty(final Property prop)
+            {
+                switch(prop)
+                {
+                    case EXPIRATION:
+                        return message.getExpiration();
+                    case IMMEDIATE:
+                        return message.isImmediate();
+                    case MANDATORY:
+                        return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+                    case PERSISTENT:
+                        return message.isPersistent();
+                    case REDELIVERED:
+                        return delvProps.getRedelivered();
+                }
+                return null;
+            }
+        };
+
+        List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
         if(queues.isEmpty() && exchange.getAlternateExchange() != null)
         {
             final Exchange alternateExchange = exchange.getAlternateExchange();
-            queues = alternateExchange.route(messageMetaData);
+            queues = alternateExchange.route(message, instanceProperties);
             if (!queues.isEmpty())
             {
                 exchangeInUse = alternateExchange;
@@ -328,12 +356,8 @@ public class ServerSessionDelegate exten
             exchangeInUse = exchange;
         }
 
-        final ServerSession serverSession = (ServerSession) ssn;
         if(!queues.isEmpty())
         {
-            final MessageStore store = getVirtualHost(ssn).getMessageStore();
-            final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
-            MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
             serverSession.enqueue(message, queues);
             storeMessage.flushToStore();
         }
@@ -352,7 +376,6 @@ public class ServerSessionDelegate exten
             }
         }
 
-
         if(serverSession.isTransactional())
         {
             serverSession.processed(xfr);
@@ -361,6 +384,7 @@ public class ServerSessionDelegate exten
         {
             serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
         }
+        reference.release();
     }
 
     private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Thu Jan 23 11:01:02 2014
@@ -33,13 +33,11 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -230,7 +228,7 @@ public class Subscription_0_10 implement
 
     private boolean checkFilters(QueueEntry entry)
     {
-        return (_filters == null) || _filters.allAllow(entry);
+        return (_filters == null) || _filters.allAllow(entry.asFilterable());
     }
 
     public boolean isClosed()
@@ -583,9 +581,7 @@ public class Subscription_0_10 implement
         final ServerMessage msg = entry.getMessage();
         if (alternateExchange != null)
         {
-            final InboundMessage m = new InboundMessageAdapter(entry);
-
-            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml Thu Jan 23 11:01:02 2014
@@ -34,6 +34,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Jan 23 11:01:02 2014
@@ -49,7 +49,6 @@ import org.apache.qpid.framing.ContentBo
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
@@ -66,7 +65,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -74,8 +73,8 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -271,7 +270,7 @@ public class AMQChannel implements AMQSe
         {
             throw new AMQSecurityException("Permission denied: " + e.getName());
         }
-        _currentMessage = new IncomingMessage(info, getProtocolSession().getReference());
+        _currentMessage = new IncomingMessage(info);
         _currentMessage.setExchange(e);
     }
 
@@ -291,12 +290,6 @@ public class AMQChannel implements AMQSe
 
             _currentMessage.setContentHeaderBody(contentHeaderBody);
 
-            _currentMessage.setExpiration();
-
-            _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
-
-            _currentMessage.route();
-
             deliverCurrentMessageIfComplete();
         }
     }
@@ -309,56 +302,86 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
 
-                if(!checkMessageUserId(_currentMessage.getContentHeader()))
-                {
-                    _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage));
-                }
-                else
+                final MessageMetaData messageMetaData =
+                        new MessageMetaData(_currentMessage.getMessagePublishInfo(),
+                                            _currentMessage.getContentHeader(),
+                                            getProtocolSession().getLastReceivedTime());
+
+                final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+                final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
+                MessageReference reference = amqMessage.newReference();
+                try
                 {
-                    if(destinationQueues == null || destinationQueues.isEmpty())
+                    int bodyCount = _currentMessage.getBodyCount();
+                    if(bodyCount > 0)
                     {
-                        handleUnroutableMessage();
+                        long bodyLengthReceived = 0;
+                        for(int i = 0 ; i < bodyCount ; i++)
+                        {
+                            ContentBody contentChunk = _currentMessage.getContentChunk(i);
+                            handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
+                            bodyLengthReceived += contentChunk.getSize();
+                        }
+                    }
+
+                    if(!checkMessageUserId(_currentMessage.getContentHeader()))
+                    {
+                        _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
                     }
                     else
                     {
-                        final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData());
-                        _currentMessage.setStoredMessage(handle);
-                        int bodyCount = _currentMessage.getBodyCount();
-                        if(bodyCount > 0)
+                        final InstanceProperties instanceProperties =
+                                new InstanceProperties()
+                                {
+                                    @Override
+                                    public Object getProperty(final Property prop)
+                                    {
+                                        switch(prop)
+                                        {
+                                            case EXPIRATION:
+                                                return amqMessage.getExpiration();
+                                            case IMMEDIATE:
+                                                return _currentMessage.getMessagePublishInfo().isImmediate();
+                                            case PERSISTENT:
+                                                return amqMessage.isPersistent();
+                                            case MANDATORY:
+                                                return _currentMessage.getMessagePublishInfo().isMandatory();
+                                            case REDELIVERED:
+                                                return false;
+                                        }
+                                        return null;
+                                    }
+                                };
+
+                        final List<? extends BaseQueue> destinationQueues =
+                            _currentMessage.getExchange().route(amqMessage, instanceProperties);
+
+                        if(destinationQueues == null || destinationQueues.isEmpty())
                         {
-                            long bodyLengthReceived = 0;
-                            for(int i = 0 ; i < bodyCount ; i++)
-                            {
-                                ContentChunk contentChunk = _currentMessage.getContentChunk(i);
-                                handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
-                                bodyLengthReceived += contentChunk.getSize();
-                            }
+                            handleUnroutableMessage(amqMessage);
                         }
-
-                        _transaction.addPostTransactionAction(new ServerTransaction.Action()
+                        else
                         {
-                            public void postCommit()
-                            {
-                            }
-
-                            public void onRollback()
-                            {
-                                handle.remove();
-                            }
-                        });
-
-                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
-                        incrementOutstandingTxnsIfNecessary();
-                        _currentMessage.getStoredMessage().flushToStore();
+                            _transaction.enqueue(destinationQueues,
+                                                 amqMessage,
+                                                 new MessageDeliveryAction(amqMessage, destinationQueues));
+                            incrementOutstandingTxnsIfNecessary();
+                            handle.flushToStore();
+
+                        }
                     }
                 }
+                finally
+                {
+                    reference.release();
+                }
+
             }
             finally
             {
                 long bodySize = _currentMessage.getSize();
-                long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp();
+                long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
                 _session.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
@@ -374,9 +397,9 @@ public class AMQChannel implements AMQSe
      * @throws AMQConnectionException if the message is mandatoryclose-on-no-route
      * @see AMQProtocolSession#isCloseWhenNoRoute()
      */
-    private void handleUnroutableMessage() throws AMQConnectionException
+    private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
     {
-        boolean mandatory = _currentMessage.isMandatory();
+        boolean mandatory = message.isMandatory();
         String description = currentMessageDescription();
         boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
 
@@ -398,13 +421,18 @@ public class AMQChannel implements AMQSe
                     (Throwable) null);
         }
 
-        if (mandatory || _currentMessage.isImmediate())
+        if (mandatory || message.isImmediate())
         {
-            _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage));
+            _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
         }
         else
         {
-            _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
+            _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
+                                                       _currentMessage.getMessagePublishInfo().getRoutingKey() == null
+                                                               ? null
+                                                               : _currentMessage.getMessagePublishInfo()
+                                                                       .getRoutingKey()
+                                                                       .toString()));
         }
     }
 
@@ -417,15 +445,17 @@ public class AMQChannel implements AMQSe
 
         return String.format(
                 "[Exchange: %s, Routing key: %s]",
-                _currentMessage.getExchange(),
-                _currentMessage.getRoutingKey());
+                _currentMessage.getExchangeName(),
+                _currentMessage.getMessagePublishInfo().getRoutingKey() == null
+                        ? null
+                        : _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
     }
 
     public void publishContentBody(ContentBody contentBody) throws AMQException
     {
         if (_currentMessage == null)
         {
-            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+            throw new AMQException("Received content body without previously receiving a Content Header");
         }
 
         if (_logger.isDebugEnabled())
@@ -435,10 +465,7 @@ public class AMQChannel implements AMQSe
 
         try
         {
-            final ContentChunk contentChunk =
-                    _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
-
-            _currentMessage.addContentBodyFrame(contentChunk);
+            _currentMessage.addContentBodyFrame(contentBody);
 
             deliverCurrentMessageIfComplete();
         }
@@ -1157,24 +1184,23 @@ public class AMQChannel implements AMQSe
     }
 
 
-    private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+    private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
             throws AMQException
     {
 
-        AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
+        AMQMessage message = new AMQMessage(handle, _session.getReference());
+
+        final BasicContentHeaderProperties properties =
+                  incomingMessage.getContentHeader().getProperties();
 
-        message.setExpiration(incomingMessage.getExpiration());
-        message.setConnectionIdentifier(_session.getReference());
+        long expiration = properties.getExpiration();
+        message.setExpiration(expiration);
         return message;
     }
 
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
-        AMQShortString userID =
-                header.getProperties() instanceof BasicContentHeaderProperties
-                    ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
-                    : null;
-
+        AMQShortString userID = header.getProperties().getUserId();
         return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
 
     }
@@ -1208,13 +1234,13 @@ public class AMQChannel implements AMQSe
 
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
-        private IncomingMessage _incommingMessage;
+        private final MessageReference<AMQMessage> _reference;
         private List<? extends BaseQueue> _destinationQueues;
 
-        public MessageDeliveryAction(IncomingMessage currentMessage,
+        public MessageDeliveryAction(AMQMessage currentMessage,
                                      List<? extends BaseQueue> destinationQueues)
         {
-            _incommingMessage = currentMessage;
+            _reference = currentMessage.newReference();
             _destinationQueues = destinationQueues;
         }
 
@@ -1222,10 +1248,8 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                final boolean immediate = _incommingMessage.isImmediate();
-
-                final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
-                MessageReference ref = amqMessage.newReference();
+                AMQMessage message = _reference.getMessage();
+                final boolean immediate = message.isImmediate();
 
                 for(int i = 0; i < _destinationQueues.size(); i++)
                 {
@@ -1242,7 +1266,7 @@ public class AMQChannel implements AMQSe
                         action = null;
                     }
 
-                    queue.enqueue(amqMessage, isTransactional(), action);
+                    queue.enqueue(message, isTransactional(), action);
 
                     if(queue instanceof AMQQueue)
                     {
@@ -1251,8 +1275,8 @@ public class AMQChannel implements AMQSe
 
                 }
 
-                _incommingMessage.getStoredMessage().flushToStore();
-                ref.release();
+                message.getStoredMessage().flushToStore();
+                _reference.release();
             }
             catch (AMQException e)
             {
@@ -1265,6 +1289,7 @@ public class AMQChannel implements AMQSe
         {
             // Maybe keep track of entries that were created and then delete them here in case of failure
             // to in memory enqueue
+            _reference.release();
         }
 
         private class ImmediateAction implements BaseQueue.PostEnqueueAction
@@ -1375,28 +1400,30 @@ public class AMQChannel implements AMQSe
     private class WriteReturnAction implements ServerTransaction.Action
     {
         private final AMQConstant _errorCode;
-        private final IncomingMessage _message;
         private final String _description;
+        private final MessageReference<AMQMessage> _reference;
 
         public WriteReturnAction(AMQConstant errorCode,
                                  String description,
-                                 IncomingMessage message)
+                                 AMQMessage message)
         {
             _errorCode = errorCode;
-            _message = message;
             _description = description;
+            _reference = message.newReference();
         }
 
         public void postCommit()
         {
             try
             {
-                _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
-                                                              _message.getContentHeader(),
-                                                              _message,
+                AMQMessage message = _reference.getMessage();
+                _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+                                                              message.getContentHeaderBody(),
+                                                              message,
                                                               _channelId,
                                                               _errorCode.getCode(),
                                                               AMQShortString.validValueOf(_description));
+                _reference.release();
             }
             catch (AMQException e)
             {
@@ -1408,6 +1435,7 @@ public class AMQChannel implements AMQSe
 
         public void onRollback()
         {
+            _reference.release();
         }
     }
 
@@ -1468,14 +1496,10 @@ public class AMQChannel implements AMQSe
         }
     }
 
-    public boolean onSameConnection(InboundMessage inbound)
+    @Override
+    public Object getConnectionReference()
     {
-        if(inbound instanceof IncomingMessage)
-        {
-            IncomingMessage incoming = (IncomingMessage) inbound;
-            return getProtocolSession().getReference() == incoming.getConnectionReference();
-        }
-        return false;
+        return getProtocolSession().getReference();
     }
 
     public int getUnacknowledgedMessageCount()
@@ -1551,9 +1575,9 @@ public class AMQChannel implements AMQSe
                 return;
             }
 
-            final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
 
-            final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
+            final List<? extends BaseQueue> destinationQueues =
+                    altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Thu Jan 23 11:01:02 2014
@@ -22,72 +22,43 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
 
-import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 
 /**
  * A deliverable message.
  */
-public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
+public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData>
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
     /** Flag to indicate that this message requires 'immediate' delivery. */
 
-    private static final byte IMMEDIATE = 0x01;
-
-    /**
-     * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
-     * for messages published with the 'immediate' flag.
-     */
-
-    private static final byte DELIVERED_TO_CONSUMER = 0x02;
-
-    private byte _flags = 0;
-
     private long _expiration;
 
     private final long _size;
 
-    private Object _connectionIdentifier;
-    private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
         this(handle, null);
     }
 
-    public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
+    public AMQMessage(StoredMessage<MessageMetaData> handle, Object connectionReference)
     {
-        super(handle);
-
-
-        final MessageMetaData metaData = handle.getMetaData();
-        _size = metaData.getContentSize();
-        final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
-
-        if(messagePublishInfo.isImmediate())
-        {
-            _flags |= IMMEDIATE;
-        }
+        super(handle, connectionReference);
+        _size = handle.getMetaData().getContentSize();
     }
 
     public void setExpiration(final long expiration)
     {
-
         _expiration = expiration;
-
     }
 
     public MessageMetaData getMessageMetaData()
@@ -100,21 +71,6 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getContentHeaderBody();
     }
 
-    public Long getMessageId()
-    {
-        return getStoredMessage().getMessageNumber();
-    }
-
-    /**
-     * Called selectors to determin if the message has already been sent
-     *
-     * @return _deliveredToConsumer
-     */
-    public boolean getDeliveredToConsumer()
-    {
-        return (_flags & DELIVERED_TO_CONSUMER) != 0;
-    }
-
     public String getRoutingKey()
     {
         MessageMetaData messageMetaData = getMessageMetaData();
@@ -134,24 +90,6 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getMessageHeader();
     }
 
-    public boolean isPersistent()
-    {
-        return getMessageMetaData().isPersistent();
-    }
-
-    /**
-     * Called to enforce the 'immediate' flag.
-     *
-     * @returns  true if the message is marked for immediate delivery but has not been marked as delivered
-     *                              to a consumer
-     */
-    public boolean immediateAndNotDelivered()
-    {
-
-        return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
-    }
-
     public MessagePublishInfo getMessagePublishInfo()
     {
         return getMessageMetaData().getMessagePublishInfo();
@@ -162,90 +100,27 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getArrivalTime();
     }
 
-    /**
-     * Checks to see if the message has expired. If it has the message is dequeued.
-     *
-     * @param queue The queue to check the expiration against. (Currently not used)
-     *
-     * @return true if the message has expire
-     *
-     * @throws AMQException
-     */
-    public boolean expired(AMQQueue queue) throws AMQException
-    {
-
-        if (_expiration != 0L)
-        {
-            long now = System.currentTimeMillis();
-
-            return (now > _expiration);
-        }
-
-        return false;
-    }
-
-    /**
-     * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
-     * And for selector efficiency.
-     */
-    public void setDeliveredToConsumer()
-    {
-        _flags |= DELIVERED_TO_CONSUMER;
-    }
-
     public long getSize()
     {
         return _size;
-
     }
 
     public boolean isImmediate()
     {
-        return (_flags & IMMEDIATE) == IMMEDIATE;
-    }
-
-    public long getExpiration()
-    {
-        return _expiration;
-    }
-
-    public MessageReference newReference()
-    {
-        return new AMQMessageReference(this);
+        return getMessagePublishInfo().isImmediate();
     }
 
-    public long getMessageNumber()
-    {
-        return getStoredMessage().getMessageNumber();
-    }
-
-
-    public Object getConnectionIdentifier()
-    {
-        return _connectionIdentifier;
-
-    }
 
-    public void setConnectionIdentifier(final Object connectionIdentifier)
+    public boolean isMandatory()
     {
-        _connectionIdentifier = connectionIdentifier;
+        return getMessagePublishInfo().isMandatory();
     }
 
 
-    public String toString()
-    {
-        return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
-    }
-
-    public int getContent(ByteBuffer buf, int offset)
+    public long getExpiration()
     {
-        return getStoredMessage().getContent(offset, buf);
+        return _expiration;
     }
 
 
-    public ByteBuffer getContent(int offset, int size)
-    {
-        return getStoredMessage().getContent(offset, size);
-    }
-
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message