qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1560770 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/model/adapter/ broker-core/src/main/jav...
Date Thu, 23 Jan 2014 17:53:43 GMT
Author: rgodfrey
Date: Thu Jan 23 17:53:42 2014
New Revision: 1560770

URL: http://svn.apache.org/r1560770
Log:
QPID-5504 : simplify QueueEntry to remove discard/dispose/dequeue and only leave delete as the correct way to remove entries

Removed:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Thu Jan 23 17:53:42 2014
@@ -23,8 +23,6 @@ package org.apache.qpid.server.filter;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 
 public interface Filterable
 {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java Thu Jan 23 17:53:42 2014
@@ -50,14 +50,7 @@ public interface InstanceProperties
         public static InstanceProperties fromMap(Map<Property, Object> map)
         {
             final Map<Property,Object> props = new EnumMap<Property,Object>(map);
-            return new InstanceProperties()
-            {
-                @Override
-                public Object getProperty(final Property prop)
-                {
-                    return props.get(prop);
-                }
-            };
+            return new MapInstanceProperties(props);
         }
 
         public static Map<Property, Object> asMap(InstanceProperties props)
@@ -75,5 +68,29 @@ public interface InstanceProperties
 
             return map;
         }
+
+        public static InstanceProperties copy(InstanceProperties from)
+        {
+            final Map<Property,Object> props = asMap(from);
+
+            return new MapInstanceProperties(props);
+
+        }
+
+        private static class MapInstanceProperties implements InstanceProperties
+        {
+            private final Map<Property, Object> _props;
+
+            private MapInstanceProperties(final Map<Property, Object> props)
+            {
+                _props = props;
+            }
+
+            @Override
+            public Object getProperty(final Property prop)
+            {
+                return _props.get(prop);
+            }
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Thu Jan 23 17:53:42 2014
@@ -767,7 +767,7 @@ public final class VirtualHostAdapter ex
                     {
                         public void postCommit()
                         {
-                            entry.discard();
+                            entry.delete();
                         }
 
                         public void onRollback()
@@ -836,7 +836,7 @@ public final class VirtualHostAdapter ex
 
                                     public void postCommit()
                                     {
-                                        entry.discard();
+                                        entry.delete();
                                     }
 
                                     public void onRollback()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Thu Jan 23 17:53:42 2014
@@ -165,7 +165,7 @@ public class ConflationQueueList extends
                                     @Override
                                     public void postCommit()
                                     {
-                                        entry.discard();
+                                        entry.delete();
                                     }
 
                                     @Override
@@ -196,21 +196,14 @@ public class ConflationQueueList extends
         }
 
         @Override
-        public boolean delete()
+        protected void onDelete()
         {
-            if(super.delete())
+            if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
             {
-                if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
-                {
-                    Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
-                    _latestValuesMap.remove(key,_latestValueReference);
-                }
-                return true;
-            }
-            else
-            {
-                return false;
+                Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
+                _latestValuesMap.remove(key,_latestValueReference);
             }
+
         }
 
         public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Jan 23 17:53:42 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 
@@ -191,9 +192,6 @@ public interface QueueEntry extends Comp
     boolean acquire();
     boolean acquire(Subscription sub);
 
-    boolean delete();
-    boolean isDeleted();
-
     boolean acquiredBySubscription();
     boolean isAcquiredBy(Subscription subscription);
 
@@ -209,11 +207,14 @@ public interface QueueEntry extends Comp
 
     boolean isRejectedBy(long subscriptionId);
 
-    void dequeue();
-
-    void dispose();
+    void delete();
 
-    void discard();
+    /**
+     * Returns true if entry is either DEQUED or DELETED state.
+     *
+     * @return true if entry is either DEQUED or DELETED state
+     */
+    boolean isDeleted();
 
     void routeToAlternate();
 
@@ -226,19 +227,6 @@ public interface QueueEntry extends Comp
     void addStateChangeListener(StateChangeListener listener);
     boolean removeStateChangeListener(StateChangeListener listener);
 
-    /**
-     * Returns true if entry is in DEQUEUED state, otherwise returns false.
-     *
-     * @return true if entry is in DEQUEUED state, otherwise returns false
-     */
-    boolean isDequeued();
-
-    /**
-     * Returns true if entry is either DEQUED or DELETED state.
-     *
-     * @return true if entry is either DEQUED or DELETED state
-     */
-    boolean isDispensed();
 
     /**
      * Number of times this queue entry has been delivered.
@@ -253,4 +241,5 @@ public interface QueueEntry extends Comp
 
     Filterable asFilterable();
 
+    InstanceProperties getInstanceProperties();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Jan 23 17:53:42 2014
@@ -25,13 +25,14 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
+import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -77,16 +78,13 @@ public abstract class QueueEntryImpl imp
 
     private volatile long _entryId;
 
-    private static final int DELIVERED_TO_CONSUMER = 1;
-    private static final int REDELIVERED = 2;
-
-    private volatile int _deliveryState;
+    private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties();
 
     /** Number of times this message has been delivered */
     private volatile int _deliveryCount = 0;
     private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
                     .newUpdater(QueueEntryImpl.class, "_deliveryCount");
-
+    private boolean _deliveredToConsumer;
 
 
     public QueueEntryImpl(QueueEntryList<?> queueEntryList)
@@ -103,12 +101,28 @@ public abstract class QueueEntryImpl imp
         _message = message == null ? null : message.newReference();
 
         _entryIdUpdater.set(this, entryId);
+        populateInstanceProperties();
     }
 
     public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
         _message = message == null ? null :  message.newReference();
+        populateInstanceProperties();
+    }
+
+    private void populateInstanceProperties()
+    {
+        if(_message != null)
+        {
+            _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent());
+            _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration());
+        }
+    }
+
+    public InstanceProperties getInstanceProperties()
+    {
+        return _instanceProperties;
     }
 
     protected void setEntryId(long entryId)
@@ -138,7 +152,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean getDeliveredToConsumer()
     {
-        return (_deliveryState & DELIVERED_TO_CONSUMER) != 0;
+        return _deliveredToConsumer;
     }
 
     public boolean expired() throws AMQException
@@ -190,7 +204,7 @@ public abstract class QueueEntryImpl imp
         final boolean acquired = acquire(sub.getOwningState());
         if(acquired)
         {
-            _deliveryState |= DELIVERED_TO_CONSUMER;
+            _deliveredToConsumer = true;
         }
         return acquired;
     }
@@ -244,12 +258,12 @@ public abstract class QueueEntryImpl imp
 
     public void setRedelivered()
     {
-        _deliveryState |= REDELIVERED;
+        _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE);
     }
 
     public boolean isRedelivered()
     {
-        return (_deliveryState & REDELIVERED) != 0;
+        return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
     }
 
     public Subscription getDeliveredSubscription()
@@ -297,7 +311,7 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public void dequeue()
+    private void dequeue()
     {
         EntryState state = _state;
 
@@ -329,21 +343,27 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public void dispose()
+    private boolean dispose()
     {
-        if(delete())
+        EntryState state = _state;
+
+        if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
+            _queueEntryList.entryDeleted(this);
+            onDelete();
             _message.release();
+
+            return true;
+        }
+        else
+        {
+            return false;
         }
     }
 
-    public void discard()
+    public void delete()
     {
-        //if the queue is null then the message is waiting to be acked, but has been removed.
-        if (getQueue() != null)
-        {
-            dequeue();
-        }
+        dequeue();
 
         dispose();
     }
@@ -355,12 +375,11 @@ public abstract class QueueEntryImpl imp
 
         if (alternateExchange != null)
         {
-            QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this);
-            List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props);
+            List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties());
             final ServerMessage message = getMessage();
             if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
             {
-                queues = alternateExchange.getAlternateExchange().route(getMessage(), props);
+                queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties());
             }
 
 
@@ -397,7 +416,7 @@ public abstract class QueueEntryImpl imp
                 {
                     public void postCommit()
                     {
-                        discard();
+                        delete();
                     }
 
                     public void onRollback()
@@ -446,24 +465,8 @@ public abstract class QueueEntryImpl imp
         return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
     }
 
-    public boolean isDeleted()
+    protected void onDelete()
     {
-        return _state == DELETED_STATE;
-    }
-
-    public boolean delete()
-    {
-        EntryState state = _state;
-
-        if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
-        {
-            _queueEntryList.entryDeleted(this);
-            return true;
-        }
-        else
-        {
-            return false;
-        }
     }
 
     public QueueEntryList getQueueEntryList()
@@ -471,12 +474,7 @@ public abstract class QueueEntryImpl imp
         return _queueEntryList;
     }
 
-    public boolean isDequeued()
-    {
-        return _state == DEQUEUED_STATE;
-    }
-
-    public boolean isDispensed()
+    public boolean isDeleted()
     {
         return _state.isDispensed();
     }
@@ -499,7 +497,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public Filterable asFilterable()
     {
-        return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this));
+        return Filterable.Factory.newInstance(getMessage(), getInstanceProperties());
     }
 
     public String toString()
@@ -509,4 +507,21 @@ public abstract class QueueEntryImpl imp
                 ", _state=" + _state +
                 '}';
     }
+
+    private static class EntryInstanceProperties implements InstanceProperties
+    {
+        private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
+
+        @Override
+        public Object getProperty(final Property prop)
+        {
+            return _properties.get(prop);
+        }
+
+        private void setProperty(Property prop, Object value)
+        {
+            _properties.put(prop, value);
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Jan 23 17:53:42 2014
@@ -1030,7 +1030,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node != null && !node.isDispensed())
+            if (node != null && !node.isDeleted())
             {
                 entryList.add(node);
             }
@@ -1153,7 +1153,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDispensed() && filter.accept(node))
+            if (!node.isDeleted() && filter.accept(node))
             {
                 entryList.add(node);
             }
@@ -1170,7 +1170,7 @@ public class SimpleAMQQueue implements A
         {
             QueueEntry node = queueListIterator.getNode();
 
-            if(!node.isDispensed())
+            if(!node.isDeleted())
             {
                 if(visitor.visit(node))
                 {
@@ -1290,7 +1290,7 @@ public class SimpleAMQQueue implements A
 
                         public void postCommit()
                         {
-                            node.discard();
+                            node.delete();
                         }
 
                         public void onRollback()
@@ -1361,11 +1361,10 @@ public class SimpleAMQQueue implements A
                 for(final QueueEntry entry : entries)
                 {
 
-                    QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry);
-                    List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props);
+                    List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
                     if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
                     {
-                        queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props);
+                        queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties());
                     }
 
                     final ServerMessage message = entry.getMessage();
@@ -1403,7 +1402,7 @@ public class SimpleAMQQueue implements A
 
                                         public void postCommit()
                                         {
-                                            entry.discard();
+                                            entry.delete();
                                         }
 
                                         public void onRollback()
@@ -1431,7 +1430,7 @@ public class SimpleAMQQueue implements A
 
                                         public void postCommit()
                                         {
-                                            entry.discard();
+                                            entry.delete();
                                         }
 
                                         public void onRollback()
@@ -1908,7 +1907,7 @@ public class SimpleAMQQueue implements A
         {
             QueueEntry node = queueListIterator.getNode();
             // Only process nodes that are not currently deleted and not dequeued
-            if (!node.isDispensed())
+            if (!node.isDeleted())
             {
                 // If the node has exired then acquire it
                 if (node.expired() && node.acquire())

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java Thu Jan 23 17:53:42 2014
@@ -57,7 +57,7 @@ public class SimpleQueueEntryImpl extend
     {
 
         SimpleQueueEntryImpl next = getNextNode();
-        while(next != null && next.isDispensed())
+        while(next != null && next.isDeleted())
         {
 
             final SimpleQueueEntryImpl newNext = next.getNextNode();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Thu Jan 23 17:53:42 2014
@@ -283,14 +283,14 @@ public class SortedQueueEntryList implem
     {
         synchronized(_lock)
         {
-            if(node.isDispensed() && _head != node)
+            if(node.isDeleted() && _head != node)
             {
                 SortedQueueEntryImpl current = _head;
                 SortedQueueEntryImpl next;
                 while(current != null)
                 {
                     next = current.getNextValidEntry();
-                    if(current.compareTo(node)>0 && !current.isDispensed())
+                    if(current.compareTo(node)>0 && !current.isDeleted())
                     {
                         break;
                     }
@@ -642,7 +642,7 @@ public class SortedQueueEntryList implem
             if(!atTail())
             {
                 SortedQueueEntryImpl nextNode = next(_lastNode);
-                while(nextNode.isDispensed() && next(nextNode) != null)
+                while(nextNode.isDeleted() && next(nextNode) != null)
                 {
                     nextNode = next(nextNode);
                 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java Thu Jan 23 17:53:42 2014
@@ -21,9 +21,11 @@
 package org.apache.qpid.server.subscription;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 
 public interface ClientDeliveryMethod
 {
-    void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException;
+    void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props,
+                         final long deliveryTag) throws AMQException;
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Thu Jan 23 17:53:42 2014
@@ -199,7 +199,7 @@ public class VirtualHostConfigRecoveryHa
 
                         public void postCommit()
                         {
-                            entry.discard();
+                            entry.delete();
                         }
 
                         public void onRollback()

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java Thu Jan 23 17:53:42 2014
@@ -66,7 +66,7 @@ public class ConflationQueueListTest ext
         ServerMessage message = createTestServerMessage(null);
 
         QueueEntry addedEntry = _list.add(message);
-        addedEntry.discard();
+        addedEntry.delete();
 
         int numberOfEntries = countEntries(_list);
         assertEquals(0, numberOfEntries);
@@ -86,7 +86,7 @@ public class ConflationQueueListTest ext
         ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
 
         QueueEntry addedEntry = _list.add(message);
-        addedEntry.discard();
+        addedEntry.delete();
 
         int numberOfEntries = countEntries(_list);
         assertEquals(0, numberOfEntries);
@@ -146,7 +146,7 @@ public class ConflationQueueListTest ext
         assertEquals(1, countEntries(_list));
         assertEquals(1, _list.getLatestValuesMap().size());
 
-        addedEntry.discard();
+        addedEntry.delete();
 
         assertEquals(0, countEntries(_list));
         assertEquals(0, _list.getLatestValuesMap().size());
@@ -166,8 +166,8 @@ public class ConflationQueueListTest ext
         assertEquals(2, countEntries(_list));
         assertEquals(2, _list.getLatestValuesMap().size());
 
-        addedEntry1.discard();
-        addedEntry2.discard();
+        addedEntry1.delete();
+        addedEntry2.delete();
 
         assertEquals(0, countEntries(_list));
         assertEquals(0, _list.getLatestValuesMap().size());

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Thu Jan 23 17:53:42 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 
@@ -56,17 +57,7 @@ public class MockQueueEntry implements Q
 
     }
 
-    public boolean delete()
-    {
-        return false;
-    }
-
-    public void dequeue()
-    {
-
-    }
-
-    public void discard()
+    public void delete()
     {
 
     }
@@ -76,11 +67,6 @@ public class MockQueueEntry implements Q
 
     }
 
-    public void dispose()
-    {
-
-    }
-
     public boolean expired() throws AMQException
     {
         return false;
@@ -121,11 +107,6 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public boolean isDeleted()
-    {
-        return false;
-    }
-
 
     public boolean isQueueDeleted()
     {
@@ -161,26 +142,6 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-
-    public void requeue()
-    {
-
-
-    }
-
-    public void requeue(Subscription subscription)
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-
-    public void setDeliveredToSubscription()
-    {
-
-
-    }
-
-
     public void setRedelivered()
     {
 
@@ -213,12 +174,7 @@ public class MockQueueEntry implements Q
         _message = msg;
     }
 
-    public boolean isDequeued()
-    {
-        return false;
-    }
-
-    public boolean isDispensed()
+    public boolean isDeleted()
     {
         return false;
     }
@@ -252,6 +208,12 @@ public class MockQueueEntry implements Q
     @Override
     public Filterable asFilterable()
     {
-        return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this));
+        return Filterable.Factory.newInstance(_message, getInstanceProperties());
+    }
+
+    @Override
+    public InstanceProperties getInstanceProperties()
+    {
+        return InstanceProperties.EMPTY;
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Thu Jan 23 17:53:42 2014
@@ -21,6 +21,7 @@ package org.apache.qpid.server.queue;
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.QueueEntry.EntryState;
 import org.apache.qpid.server.subscription.MockSubscription;
@@ -63,11 +64,6 @@ public abstract class QueueEntryImplTest
         acquire();
     }
 
-    public void testDequeue()
-    {
-        dequeue();
-    }
-
     public void testDelete()
     {
         delete();
@@ -79,27 +75,12 @@ public abstract class QueueEntryImplTest
      * Entry in state ACQUIRED should be released and its status should be
      * changed to AVAILABLE.
      */
-    public void testReleaseAquired()
+    public void testReleaseAcquired()
     {
         acquire();
         _queueEntry.release();
         assertTrue("Queue entry should be in AVAILABLE state after invoking of release method",
-                _queueEntry.isAvailable());
-    }
-
-    /**
-     * Tests release method for entry in dequeued state.
-     * <p>
-     * Invoking release on dequeued entry should not have any effect on its
-     * state.
-     */
-    public void testReleaseDequeued()
-    {
-        dequeue();
-        _queueEntry.release();
-        EntryState state = getState();
-        assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect",
-                QueueEntry.DEQUEUED_STATE, state);
+                   _queueEntry.isAvailable());
     }
 
     /**
@@ -126,17 +107,6 @@ public abstract class QueueEntryImplTest
                 _queueEntry.isDeleted());
     }
 
-    /**
-     * A helper method to put tested entry into dequeue state and assert the sate
-     */
-    private void dequeue()
-    {
-        acquire();
-        _queueEntry.dequeue();
-        EntryState state = getState();
-        assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method",
-                QueueEntry.DEQUEUED_STATE, state);
-    }
 
     /**
      * A helper method to put tested entry into acquired state and assert the sate
@@ -216,6 +186,9 @@ public abstract class QueueEntryImplTest
         {
             ServerMessage message = mock(ServerMessage.class);
             when(message.getMessageNumber()).thenReturn((long)i);
+            final MessageReference reference = mock(MessageReference.class);
+            when(reference.getMessage()).thenReturn(message);
+            when(message.newReference()).thenReturn(reference);
             QueueEntryImpl entry = queueEntryList.add(message);
             entries[i] = entry;
         }
@@ -235,13 +208,13 @@ public abstract class QueueEntryImplTest
             }
         }
 
-        // delete second
+        // discard second
         entries[1].acquire();
         entries[1].delete();
 
-        // dequeue third
+        // discard third
         entries[2].acquire();
-        entries[2].dequeue();
+        entries[2].delete();
 
         QueueEntry next = entries[0].getNextValidEntry();
         assertEquals("expected forth entry",entries[3], next);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Thu Jan 23 17:53:42 2014
@@ -156,7 +156,7 @@ public abstract class QueueEntryListTest
             if(counter++ % 2 == 0)
             {
                 queueEntry.acquire();
-                queueEntry.dequeue();
+                queueEntry.delete();
             }
         }
 
@@ -225,7 +225,8 @@ public abstract class QueueEntryListTest
         assertNull(list.next(queueEntry2));
 
         //'delete' the 2nd QueueEntry
-        assertTrue("Deleting node should have succeeded", queueEntry2.delete());
+        queueEntry2.delete();
+        assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted());
 
         QueueEntryIterator<QueueEntry> iter = list.iterator();
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Jan 23 17:53:42 2014
@@ -936,12 +936,8 @@ public class SimpleAMQQueueTest extends 
                             {
                                 return new SimpleQueueEntryImpl(this, message)
                                 {
-                                    public boolean isDequeued()
-                                    {
-                                        return (message.getMessageNumber() % 2 == 0);
-                                    }
 
-                                    public boolean isDispensed()
+                                    public boolean isDeleted()
                                     {
                                         return (message.getMessageNumber() % 2 == 0);
                                     }
@@ -1166,8 +1162,8 @@ public class SimpleAMQQueueTest extends 
         List<QueueEntry> entries = queue.getMessagesOnTheQueue();
         QueueEntry entry = entries.get(dequeueMessageIndex);
         entry.acquire();
-        entry.dequeue();
-        assertTrue(entry.isDequeued());
+        entry.delete();
+        assertTrue(entry.isDeleted());
         return entry;
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Thu Jan 23 17:53:42 2014
@@ -21,18 +21,24 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase {
+public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
+{
 
     private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
 
-    public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+    public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
+    {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn((long)msgId);
+        final MessageReference reference = mock(MessageReference.class);
+        when(reference.getMessage()).thenReturn(message);
+        when(message.newReference()).thenReturn(reference);
         return queueEntryList.add(message);
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java Thu Jan 23 17:53:42 2014
@@ -132,37 +132,45 @@ public class SimpleQueueEntryListTest ex
         //requiring a scavenge once the requested threshold of 9 deletes is passed
 
         //Delete the 2nd message only
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(2).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,2));
         verifyDeletedButPresentBeforeScavenge(head, 2);
 
         //Delete messages 12 to 14
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(12).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,12));
         verifyDeletedButPresentBeforeScavenge(head, 12);
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(13).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,13));
         verifyDeletedButPresentBeforeScavenge(head, 13);
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,14));
         verifyDeletedButPresentBeforeScavenge(head, 14);
 
         //Delete message 20 only
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,20));
         verifyDeletedButPresentBeforeScavenge(head, 20);
 
         //Delete messages 81 to 84
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,81));
         verifyDeletedButPresentBeforeScavenge(head, 81);
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(82).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,82));
         verifyDeletedButPresentBeforeScavenge(head, 82);
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(83).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,83));
         verifyDeletedButPresentBeforeScavenge(head, 83);
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,84));
         verifyDeletedButPresentBeforeScavenge(head, 84);
 
         //Delete message 99 - this is the 10th message deleted that is after the queue head
         //and so will invoke the scavenge() which is set to go after 9 previous deletions
-        assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete());
+        assertTrue("Failed to delete QueueEntry", remove(entriesMap,99));
 
         verifyAllDeletedMessagedNotPresent(head, entriesMap);
     }
+    
+    private boolean remove(Map<Integer,QueueEntry> entriesMap, int pos)
+    {
+        QueueEntry entry = entriesMap.remove(pos);
+        boolean wasDeleted = entry.isDeleted();
+        entry.delete();
+        return entry.isDeleted() && !wasDeleted;
+    }
 
     private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId)
     {
@@ -211,6 +219,9 @@ public class SimpleQueueEntryListTest ex
         {
             ServerMessage message =  mock(ServerMessage.class);
             when(message.getMessageNumber()).thenReturn((long)i);
+            final MessageReference reference = mock(MessageReference.class);
+            when(reference.getMessage()).thenReturn(message);
+            when(message.newReference()).thenReturn(reference);
             entries[i] = queueEntryList.add(message);
         }
 
@@ -235,7 +246,7 @@ public class SimpleQueueEntryListTest ex
 
         // dequeue third
         entries[2].acquire();
-        entries[2].dequeue();
+        entries[2].delete();
 
         SimpleQueueEntryImpl next = entries[2].getNextValidEntry();
         assertEquals("expected forth entry", entries[3], next);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java Thu Jan 23 17:53:42 2014
@@ -22,19 +22,22 @@ package org.apache.qpid.server.queue;
 import java.util.Collections;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class SortedQueueEntryImplTest extends QueueEntryImplTestBase {
+public class SortedQueueEntryImplTest extends QueueEntryImplTestBase
+{
 
     public final static String keys[] = { "CCC", "AAA", "BBB" };
 
     private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY");
 
-    public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+    public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
+    {
         final ServerMessage message = mock(ServerMessage.class);
         AMQMessageHeader hdr = mock(AMQMessageHeader.class);
         when(message.getMessageHeader()).thenReturn(hdr);
@@ -42,6 +45,9 @@ public class SortedQueueEntryImplTest ex
         when(hdr.containsHeader(eq("KEY"))).thenReturn(true);
         when(hdr.getHeaderNames()).thenReturn(Collections.singleton("KEY"));
 
+        final MessageReference reference = mock(MessageReference.class);
+        when(reference.getMessage()).thenReturn(message);
+        when(message.newReference()).thenReturn(reference);
         return queueEntryList.add(message);
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Thu Jan 23 17:53:42 2014
@@ -37,7 +37,6 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -532,7 +531,7 @@ public class Subscription_0_10 implement
                                    {
                                        restoreCredit(entry);
                                    }
-                                   entry.discard();
+                                   entry.delete();
                                }
 
                                public void onRollback()
@@ -548,7 +547,7 @@ public class Subscription_0_10 implement
         entry.routeToAlternate();
         if(entry.isAcquiredBy(this))
         {
-            entry.discard();
+            entry.delete();
         }
     }
 
@@ -581,11 +580,11 @@ public class Subscription_0_10 implement
         final ServerMessage msg = entry.getMessage();
         if (alternateExchange != null)
         {
-            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
+            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {
-                entry.discard();
+                entry.delete();
 
                 logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
             }
@@ -602,7 +601,7 @@ public class Subscription_0_10 implement
         }
         else
         {
-            entry.discard();
+            entry.delete();
             logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
         }
     }
@@ -787,7 +786,7 @@ public class Subscription_0_10 implement
         {
             _unacknowledgedBytes.addAndGet(-entry.getSize());
             _unacknowledgedCount.decrementAndGet();
-            entry.discard();
+            entry.delete();
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Jan 23 17:53:42 2014
@@ -74,7 +74,6 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -736,7 +735,7 @@ public class AMQChannel implements AMQSe
             }
             else
             {
-                unacked.discard();
+                unacked.delete();
             }
         }
 
@@ -771,7 +770,7 @@ public class AMQChannel implements AMQSe
                 _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
                           + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
 
-                unacked.discard();
+                unacked.delete();
             }
         }
         else
@@ -1362,7 +1361,7 @@ public class AMQChannel implements AMQSe
             {
                 for(QueueEntry entry : _ackedMessages)
                 {
-                    entry.discard();
+                    entry.delete();
                 }
             }
             finally
@@ -1571,19 +1570,19 @@ public class AMQChannel implements AMQSe
             {
                 _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
                 _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
-                rejectedQueueEntry.discard();
+                rejectedQueueEntry.delete();
                 return;
             }
 
 
             final List<? extends BaseQueue> destinationQueues =
-                    altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
+                    altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {
                 _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
                 _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
-                rejectedQueueEntry.discard();
+                rejectedQueueEntry.delete();
                 return;
             }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Jan 23 17:53:42 2014
@@ -73,6 +73,8 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
@@ -88,7 +90,6 @@ import org.apache.qpid.server.model.Port
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -348,7 +349,7 @@ public class AMQProtocolEngine implement
      * Process the data block.
      * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
      *
-     * @throws an AMQConnectionException if unable to process the data block. In this case,
+     * @throws AMQConnectionException if unable to process the data block. In this case,
      * the connection is already closed by the time the exception is thrown. If any other
      * type of exception is thrown, the connection is not already closed.
      */
@@ -376,7 +377,7 @@ public class AMQProtocolEngine implement
      * Handle the supplied frame.
      * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
      *
-     * @throws an AMQConnectionException if unable to process the data block. In this case,
+     * @throws AMQConnectionException if unable to process the data block. In this case,
      * the connection is already closed by the time the exception is thrown. If any other
      * type of exception is thrown, the connection is not already closed.
      */
@@ -1667,12 +1668,17 @@ public class AMQProtocolEngine implement
             _channelId = channelId;
         }
 
-        public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+        @Override
+        public void deliverToClient(final Subscription sub, final ServerMessage message,
+                                    final InstanceProperties props, final long deliveryTag)
                 throws AMQException
         {
-            registerMessageDelivered(entry.getMessage().getSize());
-            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
-            entry.incrementDeliveryCount();
+            registerMessageDelivered(message.getSize());
+            _protocolOutputConverter.writeDeliver(message,
+                                                  props,
+                                                  _channelId,
+                                                  deliveryTag,
+                                                  ((SubscriptionImpl)sub).getConsumerTag());
         }
 
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Thu Jan 23 17:53:42 2014
@@ -115,7 +115,7 @@ public class ExtractResendAndRequeue imp
 
                         public void postCommit()
                         {
-                            node.discard();
+                            node.delete();
                         }
 
                         public void onRollback()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Thu Jan 23 17:53:42 2014
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -36,6 +35,9 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -132,7 +134,7 @@ public abstract class SubscriptionImpl i
             synchronized (getChannel())
             {
                 long deliveryTag = getChannel().getNextDeliveryTag();
-                sendToClient(entry, deliveryTag);
+                sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
             }
 
         }
@@ -147,7 +149,7 @@ public abstract class SubscriptionImpl i
 
     public static class NoAckSubscription extends SubscriptionImpl
     {
-        private volatile AutoCommitTransaction _txn;
+        private final AutoCommitTransaction _txn;
 
         public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
                                  AMQShortString consumerTag, FieldTable filters,
@@ -157,6 +159,7 @@ public abstract class SubscriptionImpl i
             throws AMQException
         {
             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore());
         }
 
 
@@ -192,23 +195,22 @@ public abstract class SubscriptionImpl i
 
             // The send may of course still fail, in which case, as
             // the message is unacked, it will be lost.
-            if(_txn == null)
-            {
-                _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
-            }
             _txn.dequeue(getQueue(), entry.getMessage(), NOOP);
 
-            entry.dequeue();
+            ServerMessage message = entry.getMessage();
+            MessageReference ref = message.newReference();
+            InstanceProperties props = entry.getInstanceProperties();
+            entry.delete();
 
             synchronized (getChannel())
             {
                 getChannel().getProtocolSession().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
-                sendToClient(entry, deliveryTag);
+                sendToClient(message, props, deliveryTag);
 
             }
-            entry.dispose();
+            ref.release();
 
 
         }
@@ -301,8 +303,8 @@ public abstract class SubscriptionImpl i
 
                 addUnacknowledgedMessage(entry);
                 recordMessageDelivery(entry, deliveryTag);
-                sendToClient(entry, deliveryTag);
-
+                sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+                entry.incrementDeliveryCount();
 
             }
         }
@@ -688,12 +690,12 @@ public abstract class SubscriptionImpl i
     }
 
 
-    protected void sendToClient(final QueueEntry entry, final long deliveryTag)
+    protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
             throws AMQException
     {
-        _deliveryMethod.deliverToClient(this,entry,deliveryTag);
+        _deliveryMethod.deliverToClient(this, message, props, deliveryTag);
         _deliveredCount.incrementAndGet();
-        _deliveredBytes.addAndGet(entry.getSize());
+        _deliveredBytes.addAndGet(message.getSize());
     }
 
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Thu Jan 23 17:53:42 2014
@@ -28,10 +28,11 @@ import org.apache.qpid.framing.BasicGetB
 import org.apache.qpid.framing.BasicGetEmptyBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -126,21 +127,18 @@ public class BasicGetMethodHandler imple
         final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
         {
 
-            public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            @Override
+            public void deliverToClient(final Subscription sub, final ServerMessage message, final
+                                        InstanceProperties props, final long deliveryTag)
             throws AMQException
             {
-                singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
-                if(entry.getMessage() instanceof AMQMessage)
-                {
-                    session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
-                                                                            deliveryTag, queue.getMessageCount());
-                    entry.incrementDeliveryCount();
-                }
-                else
-                {
-                    //TODO Convert AMQP 0-10 message
-                    throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Not implemented conversion of 0-10 message", null);
-                }
+                singleMessageCredit.useCreditForMessage(message.getSize());
+                session.getProtocolOutputConverter().writeGetOk(message,
+                                                                props,
+                                                                channel.getChannelId(),
+                                                                deliveryTag,
+                                                                queue.getMessageCount());
+
 
             }
         };

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Thu Jan 23 17:53:42 2014
@@ -79,7 +79,7 @@ public class BasicRejectMethodHandler im
                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 if(message != null)
                 {
-                    message.discard();
+                    message.delete();
                 }
                 return;
             }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java Thu Jan 23 17:53:42 2014
@@ -31,7 +31,9 @@ import org.apache.qpid.framing.AMQDataBl
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueEntry;
 
@@ -44,10 +46,17 @@ public interface ProtocolOutputConverter
         ProtocolOutputConverter newInstance(AMQProtocolSession session);
     }
 
-    void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+    void writeDeliver(final ServerMessage msg,
+                      final InstanceProperties props, int channelId,
+                      long deliveryTag,
+                      AMQShortString consumerTag)
             throws AMQException;
 
-    void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+    void writeGetOk(final ServerMessage msg,
+                    final InstanceProperties props,
+                    int channelId,
+                    long deliveryTag,
+                    int queueSize) throws AMQException;
 
     byte getProtocolMinorVersion();
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java Thu Jan 23 17:53:42 2014
@@ -33,13 +33,13 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -64,24 +64,27 @@ class ProtocolOutputConverterImpl implem
         return _protocolSession;
     }
 
-    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+    public void writeDeliver(final ServerMessage m,
+                             final InstanceProperties props, int channelId,
+                             long deliveryTag,
+                             AMQShortString consumerTag)
             throws AMQException
     {
-        AMQMessage msg = convertToAMQMessage(entry);
-        AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag);
+        final AMQMessage msg = convertToAMQMessage(m);
+        final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
+        AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
         writeMessageDelivery(msg, channelId, deliverBody);
     }
 
-    private AMQMessage convertToAMQMessage(QueueEntry entry)
+    private AMQMessage convertToAMQMessage(ServerMessage serverMessage)
     {
-        ServerMessage serverMessage = entry.getMessage();
         if(serverMessage instanceof AMQMessage)
         {
             return (AMQMessage) serverMessage;
         }
         else
         {
-            return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost());
+            return getMessageConverter(serverMessage).convert(serverMessage, _protocolSession.getVirtualHost());
         }
     }
 
@@ -186,10 +189,14 @@ class ProtocolOutputConverterImpl implem
         }
     }
 
-    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    public void writeGetOk(final ServerMessage msg,
+                           final InstanceProperties props,
+                           int channelId,
+                           long deliveryTag,
+                           int queueSize) throws AMQException
     {
-        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
-        writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver);
+        AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize);
+        writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
     }
 
 
@@ -274,18 +281,18 @@ class ProtocolOutputConverterImpl implem
         }
     }
 
-    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+    private AMQBody createEncodedGetOkBody(ServerMessage msg, InstanceProperties props, long deliveryTag, int queueSize)
             throws AMQException
     {
         final AMQShortString exchangeName;
         final AMQShortString routingKey;
 
-        final AMQMessage message = convertToAMQMessage(entry);
+        final AMQMessage message = convertToAMQMessage(msg);
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         exchangeName = pb.getExchange();
         routingKey = pb.getRoutingKey();
 
-        final boolean isRedelivered = entry.isRedelivered();
+        final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
 
         BasicGetOkBody getOkBody =
                 _methodRegistry.createBasicGetOkBody(deliveryTag,

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Thu Jan 23 17:53:42 2014
@@ -41,7 +41,9 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -133,11 +135,6 @@ public class InternalTestProtocolSession
         }
     }
 
-    // *** ProtocolOutputConverter Implementation
-    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-    {
-    }
-
     public ClientDeliveryMethod createDeliveryMethod(int channelId)
     {
         return new InternalWriteDeliverMethod(channelId);
@@ -147,7 +144,10 @@ public class InternalTestProtocolSession
     {
     }
 
-    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+    public void writeDeliver(final ServerMessage msg,
+                             final InstanceProperties props, int channelId,
+                             long deliveryTag,
+                             AMQShortString consumerTag) throws AMQException
     {
         _deliveryCount.incrementAndGet();
 
@@ -169,11 +169,15 @@ public class InternalTestProtocolSession
                 consumers.put(consumerTag, consumerDelivers);
             }
 
-            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
         }
     }
 
-    public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    public void writeGetOk(final ServerMessage msg,
+                           final InstanceProperties props,
+                           int channelId,
+                           long deliveryTag,
+                           int queueSize) throws AMQException
     {
     }
 
@@ -195,15 +199,15 @@ public class InternalTestProtocolSession
     public class DeliveryPair
     {
         private long _deliveryTag;
-        private AMQMessage _message;
+        private ServerMessage _message;
 
-        public DeliveryPair(long deliveryTag, AMQMessage message)
+        public DeliveryPair(long deliveryTag, ServerMessage message)
         {
             _deliveryTag = deliveryTag;
             _message = message;
         }
 
-        public AMQMessage getMessage()
+        public ServerMessage getMessage()
         {
             return _message;
         }
@@ -242,7 +246,9 @@ public class InternalTestProtocolSession
         }
 
 
-        public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+        @Override
+        public void deliverToClient(Subscription sub, ServerMessage message,
+                                    InstanceProperties props, long deliveryTag) throws AMQException
         {
             _deliveryCount.incrementAndGet();
 
@@ -264,7 +270,7 @@ public class InternalTestProtocolSession
                     consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
                 }
 
-                consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+                consumerDelivers.add(new DeliveryPair(deliveryTag, message));
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Jan 23 17:53:42 2014
@@ -631,7 +631,7 @@ public class SendingLink_1_0 implements 
                                 {
                                     public void postCommit()
                                     {
-                                        queueEntry.discard();
+                                        queueEntry.delete();
                                     }
 
                                     public void onRollback()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1560770&r1=1560769&r2=1560770&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Jan 23 17:53:42 2014
@@ -148,14 +148,12 @@ class
 
     public boolean hasInterest(final QueueEntry entry)
     {
-        if(entry.getMessage() instanceof Message_1_0)
+        if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference())
         {
-            if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
-            {
-                return false;
-            }
+            return false;
         }
-        else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+        else if(!(entry.getMessage() instanceof Message_1_0)
+                && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
         {
             return false;
         }
@@ -537,7 +535,7 @@ class
                             {
                                 if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
                                 {
-                                    _queueEntry.discard();
+                                    _queueEntry.delete();
                                 }
                             }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message