qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1560424 [1/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/mes...
Date Wed, 22 Jan 2014 17:16:45 GMT
Author: rgodfrey
Date: Wed Jan 22 17:16:44 2014
New Revision: 1560424

URL: http://svn.apache.org/r1560424
Log:
QPID-5504 : initial refactoring to move common code into shared classes, make transports work similarly with respect to message routing

Removed:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java Wed Jan 22 17:16:44 2014
@@ -663,9 +663,8 @@ public class UpgradeFrom4To5 extends Abs
             {
                 final MessagePublishInfo publishBody = readMessagePublishInfo(input);
                 final ContentHeaderBody contentHeaderBody = readContentHeaderBody(input);
-                final int contentChunkCount = input.readInt();
 
-                return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+                return new MessageMetaData(publishBody, contentHeaderBody);
             }
             catch (Exception e)
             {

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Wed Jan 22 17:16:44 2014
@@ -94,7 +94,7 @@ public class BDBMessageStoreTest extends
 
         ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
 
-        MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+        MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
         StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
 
         long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
@@ -150,7 +150,7 @@ public class BDBMessageStoreTest extends
         assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight());
         assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize());
 
-        BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+        BasicContentHeaderProperties returnedProperties_0_8 =   returnedHeaderBody_0_8.getProperties();
         assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
         assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
 
@@ -392,7 +392,7 @@ public class BDBMessageStoreTest extends
 
         ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
 
-        MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+        MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
         StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
 
         storedMessage_0_8.addContent(0, chunk1);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java Wed Jan 22 17:16:44 2014
@@ -47,10 +47,6 @@ public interface AMQMessageHeader
 
     String getReplyTo();
 
-    String getReplyToExchange();
-    String getReplyToRoutingKey();
-
-
     Object getHeader(String name);
 
     boolean containsHeaders(Set<String> names);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Wed Jan 22 17:16:44 2014
@@ -23,9 +23,11 @@ package org.apache.qpid.server.message;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
+public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
 {
 
     private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
@@ -33,10 +35,13 @@ public abstract class AbstractServerMess
 
     private volatile int _referenceCount = 0;
     private final StoredMessage<T> _handle;
+    private final Object _connectionReference;
 
-    public AbstractServerMessageImpl(StoredMessage<T> handle)
+
+    public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
     {
         _handle = handle;
+        _connectionReference = connectionReference;
     }
 
     public StoredMessage<T> getStoredMessage()
@@ -44,16 +49,11 @@ public abstract class AbstractServerMess
         return _handle;
     }
 
-    public boolean incrementReference()
+    private boolean incrementReference()
     {
-        return incrementReference(1);
-    }
-
-    public boolean incrementReference(int count)
-    {
-        if(_refCountUpdater.addAndGet(this, count) <= 0)
+        if(_refCountUpdater.incrementAndGet(this) <= 0)
         {
-            _refCountUpdater.addAndGet(this, -count);
+            _refCountUpdater.decrementAndGet(this);
             return false;
         }
         else
@@ -67,7 +67,7 @@ public abstract class AbstractServerMess
      * message store.
      *
      */
-    public void decrementReference()
+    private void decrementReference()
     {
         int count = _refCountUpdater.decrementAndGet(this);
 
@@ -104,8 +104,72 @@ public abstract class AbstractServerMess
         return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
     }
 
-    protected int getReferenceCount()
+    private int getReferenceCount()
     {
         return _referenceCount;
     }
+
+    @Override
+    final public MessageReference<X> newReference()
+    {
+        return new Reference();
+    }
+
+    @Override
+    final public boolean isPersistent()
+    {
+        return _handle.getMetaData().isPersistent();
+    }
+
+    @Override
+    final public long getMessageNumber()
+    {
+        return getStoredMessage().getMessageNumber();
+    }
+
+    @Override
+    final public int getContent(ByteBuffer buf, int offset)
+    {
+        return getStoredMessage().getContent(offset, buf);
+    }
+
+    @Override
+    final public ByteBuffer getContent(int offset, int size)
+    {
+        return getStoredMessage().getContent(offset, size);
+    }
+
+    final public Object getConnectionReference()
+    {
+        return _connectionReference;
+    }public String toString()
+    {
+        return "Message[" + debugIdentity() + "]";
+    }
+
+    private final class Reference implements MessageReference<X>
+    {
+
+        private final AtomicBoolean _released = new AtomicBoolean(false);
+
+        private Reference()
+        {
+            incrementReference();
+        }
+
+        public X getMessage()
+        {
+            return (X) AbstractServerMessageImpl.this;
+        }
+
+        public void release()
+        {
+            if(!_released.getAndSet(true))
+            {
+                decrementReference();
+            }
+        }
+
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java Wed Jan 22 17:16:44 2014
@@ -34,4 +34,6 @@ public interface InboundMessage extends 
     boolean isRedelivered();
 
     long getSize();
+
+    Object getConnectionReference();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java Wed Jan 22 17:16:44 2014
@@ -20,39 +20,8 @@
  */
 package org.apache.qpid.server.message;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage>
 {
-
-    private final AtomicBoolean _released = new AtomicBoolean(false);
-
-    private volatile M _message;
-
-    public MessageReference(M message)
-    {
-        _message = message;
-        onReference(message);
-    }
-
-    abstract protected void onReference(M message);
-
-    abstract protected void onRelease(M message);
-
-    public M getMessage()
-    {
-        return _message;
-    }
-
-    public void release()
-    {
-        if(!_released.getAndSet(true))
-        {
-            if(_message != null)
-            {
-                onRelease(_message);
-            }
-        }
-    }
-
+    public M getMessage();
+    public void release();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Wed Jan 22 17:16:44 2014
@@ -37,8 +37,6 @@ public interface ServerMessage<T extends
 
     long getSize();
 
-    boolean isImmediate();
-
     long getExpiration();
 
     MessageReference newReference();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Wed Jan 22 17:16:44 2014
@@ -67,4 +67,10 @@ public class InboundMessageAdapter imple
     {
         return _entry.getSize();
     }
+
+    @Override
+    public Object getConnectionReference()
+    {
+        return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Wed Jan 22 17:16:44 2014
@@ -96,16 +96,6 @@ public class HeadersBindingTest extends 
             return null;
         }
 
-        public String getReplyToExchange()
-        {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-        public String getReplyToRoutingKey()
-        {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
         public Object getHeader(String name)
         {
             return _headers.get(name);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Wed Jan 22 17:16:44 2014
@@ -130,11 +130,6 @@ public class TestMessageMetaDataType imp
             return _storedMsg;
         }
 
-        @Override
-        public boolean isImmediate()
-        {
-            return false;
-        }
 
         @Override
         public boolean isPersistent()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Wed Jan 22 17:16:44 2014
@@ -113,7 +113,6 @@ public class MessageConverter_v0_10 impl
 
 
         deliveryProps.setExpiration(serverMsg.getExpiration());
-        deliveryProps.setImmediate(serverMsg.isImmediate());
         deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
         deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
         deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Wed Jan 22 17:16:44 2014
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
+public class MessageMetaData_0_10 implements StorableMessageMetaData
 {
     private Header _header;
     private DeliveryProperties _deliveryProps;
@@ -53,8 +53,6 @@ public class MessageMetaData_0_10 implem
     private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
 
     private volatile ByteBuffer _encoded;
-    private Object _connectionReference;
-
 
     public MessageMetaData_0_10(MessageTransfer xfr)
     {
@@ -202,12 +200,6 @@ public class MessageMetaData_0_10 implem
         return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
     }
 
-    public boolean isRedelivered()
-    {
-        // The *Message* is never redelivered, only queue entries are...
-        return false;
-    }
-
     public long getArrivalTime()
     {
         return _arrivalTime;
@@ -218,16 +210,6 @@ public class MessageMetaData_0_10 implem
         return _header;
     }
 
-    public void setConnectionReference(Object connectionReference)
-    {
-        _connectionReference = connectionReference;
-    }
-
-    public Object getConnectionReference()
-    {
-        return _connectionReference;
-    }
-
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
         public MessageMetaData_0_10 createMetaData(ByteBuffer buf)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Wed Jan 22 17:16:44 2014
@@ -30,15 +30,12 @@ import org.apache.qpid.transport.Header;
 import java.nio.ByteBuffer;
 
 
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage
 {
 
-    private Object _connectionRef;
-
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
     {
-        super(storeMessage);
-        _connectionRef = connectionRef;
+        super(storeMessage, connectionRef);
     }
 
     private MessageMetaData_0_10 getMetaData()
@@ -56,12 +53,6 @@ public class MessageTransferMessage exte
         return getMetaData().getMessageHeader();
     }
 
-    public boolean isPersistent()
-    {
-        return getMetaData().isPersistent();
-    }
-
-
     public boolean isRedelivered()
     {
         // The *Message* is never redelivered, only queue entries are... this is here so that filters
@@ -71,7 +62,6 @@ public class MessageTransferMessage exte
 
     public long getSize()
     {
-
         return getMetaData().getSize();
     }
 
@@ -85,32 +75,11 @@ public class MessageTransferMessage exte
         return getMetaData().getExpiration();
     }
 
-    public MessageReference newReference()
-    {
-        return new TransferMessageReference(this);
-    }
-
-    public long getMessageNumber()
-    {
-        return getStoredMessage().getMessageNumber();
-    }
-
     public long getArrivalTime()
     {
         return getMetaData().getArrivalTime();
     }
 
-    public int getContent(ByteBuffer buf, int offset)
-    {
-        return getStoredMessage().getContent(offset, buf);
-    }
-
-
-    public ByteBuffer getContent(int offset, int size)
-    {
-        return getStoredMessage().getContent(offset,size);
-    }
-
     public Header getHeader()
     {
         return getMetaData().getHeader();
@@ -118,13 +87,6 @@ public class MessageTransferMessage exte
 
     public ByteBuffer getBody()
     {
-
         return  getContent(0, (int)getSize());
     }
-
-    public Object getConnectionReference()
-    {
-        return _connectionRef;
-    }
-
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Jan 22 17:16:44 2014
@@ -184,7 +184,7 @@ public class ServerSession extends Sessi
         return isCommandsFull(id);
     }
 
-    public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
+    public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
     {
         if(_outstandingCredit.get() != UNLIMITED_CREDIT
                 && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -768,10 +768,7 @@ public class ServerSession extends Sessi
 
     public boolean onSameConnection(InboundMessage inbound)
     {
-        return ((inbound instanceof MessageTransferMessage)
-                && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference())
-                || ((inbound instanceof MessageMetaData_0_10)
-                    && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference());
+        return inbound.getConnectionReference() == getConnection().getReference();
     }
 
 
@@ -852,31 +849,25 @@ public class ServerSession extends Sessi
     private class PostEnqueueAction implements ServerTransaction.Action
     {
 
-        private List<? extends BaseQueue> _queues;
-        private ServerMessage _message;
+        private final MessageReference<MessageTransferMessage> _reference;
+        private final List<? extends BaseQueue> _queues;
         private final boolean _transactional;
 
-        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
+        public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional)
         {
+            _reference = message.newReference();
             _transactional = transactional;
-            setState(queues, message);
-        }
-
-        public void setState(List<? extends BaseQueue> queues, ServerMessage message)
-        {
-            _message = message;
             _queues = queues;
         }
 
         public void postCommit()
         {
-            MessageReference<?> ref = _message.newReference();
             for(int i = 0; i < _queues.size(); i++)
             {
                 try
                 {
                     BaseQueue queue = _queues.get(i);
-                    queue.enqueue(_message, _transactional, null);
+                    queue.enqueue(_reference.getMessage(), _transactional, null);
                     if(queue instanceof AMQQueue)
                     {
                         ((AMQQueue)queue).checkCapacity(ServerSession.this);
@@ -889,12 +880,13 @@ public class ServerSession extends Sessi
                     throw new RuntimeException(e);
                 }
             }
-            ref.release();
+            _reference.release();
         }
 
         public void onRollback()
         {
             // NO-OP
+            _reference.release();
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Jan 22 17:16:44 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.H
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
@@ -297,7 +298,6 @@ public class ServerSessionDelegate exten
         }
 
         final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
-        messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
 
         if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
         {
@@ -309,11 +309,16 @@ public class ServerSessionDelegate exten
         }
 
         final Exchange exchangeInUse;
-        List<? extends BaseQueue> queues = exchange.route(messageMetaData);
+        final MessageStore store = getVirtualHost(ssn).getMessageStore();
+        final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
+        final ServerSession serverSession = (ServerSession) ssn;
+        MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+        MessageReference<MessageTransferMessage> reference = message.newReference();
+        List<? extends BaseQueue> queues = exchange.route(message);
         if(queues.isEmpty() && exchange.getAlternateExchange() != null)
         {
             final Exchange alternateExchange = exchange.getAlternateExchange();
-            queues = alternateExchange.route(messageMetaData);
+            queues = alternateExchange.route(message);
             if (!queues.isEmpty())
             {
                 exchangeInUse = alternateExchange;
@@ -328,12 +333,8 @@ public class ServerSessionDelegate exten
             exchangeInUse = exchange;
         }
 
-        final ServerSession serverSession = (ServerSession) ssn;
         if(!queues.isEmpty())
         {
-            final MessageStore store = getVirtualHost(ssn).getMessageStore();
-            final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
-            MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
             serverSession.enqueue(message, queues);
             storeMessage.flushToStore();
         }
@@ -352,7 +353,6 @@ public class ServerSessionDelegate exten
             }
         }
 
-
         if(serverSession.isTransactional())
         {
             serverSession.processed(xfr);
@@ -361,6 +361,7 @@ public class ServerSessionDelegate exten
         {
             serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
         }
+        reference.release();
     }
 
     private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Jan 22 17:16:44 2014
@@ -49,7 +49,6 @@ import org.apache.qpid.framing.ContentBo
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
@@ -271,7 +270,7 @@ public class AMQChannel implements AMQSe
         {
             throw new AMQSecurityException("Permission denied: " + e.getName());
         }
-        _currentMessage = new IncomingMessage(info, getProtocolSession().getReference());
+        _currentMessage = new IncomingMessage(info);
         _currentMessage.setExchange(e);
     }
 
@@ -291,12 +290,6 @@ public class AMQChannel implements AMQSe
 
             _currentMessage.setContentHeaderBody(contentHeaderBody);
 
-            _currentMessage.setExpiration();
-
-            _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
-
-            _currentMessage.route();
-
             deliverCurrentMessageIfComplete();
         }
     }
@@ -309,56 +302,62 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
 
-                if(!checkMessageUserId(_currentMessage.getContentHeader()))
-                {
-                    _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage));
-                }
-                else
+                final MessageMetaData messageMetaData =
+                        new MessageMetaData(_currentMessage.getMessagePublishInfo(),
+                                            _currentMessage.getContentHeader(),
+                                            getProtocolSession().getLastReceivedTime());
+
+                final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+                final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
+                MessageReference reference = amqMessage.newReference();
+                try
                 {
-                    if(destinationQueues == null || destinationQueues.isEmpty())
+                    int bodyCount = _currentMessage.getBodyCount();
+                    if(bodyCount > 0)
                     {
-                        handleUnroutableMessage();
+                        long bodyLengthReceived = 0;
+                        for(int i = 0 ; i < bodyCount ; i++)
+                        {
+                            ContentBody contentChunk = _currentMessage.getContentChunk(i);
+                            handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
+                            bodyLengthReceived += contentChunk.getSize();
+                        }
+                    }
+
+                    if(!checkMessageUserId(_currentMessage.getContentHeader()))
+                    {
+                        _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
                     }
                     else
                     {
-                        final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData());
-                        _currentMessage.setStoredMessage(handle);
-                        int bodyCount = _currentMessage.getBodyCount();
-                        if(bodyCount > 0)
+                        final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage);
+
+                        if(destinationQueues == null || destinationQueues.isEmpty())
                         {
-                            long bodyLengthReceived = 0;
-                            for(int i = 0 ; i < bodyCount ; i++)
-                            {
-                                ContentChunk contentChunk = _currentMessage.getContentChunk(i);
-                                handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
-                                bodyLengthReceived += contentChunk.getSize();
-                            }
+                            handleUnroutableMessage(amqMessage);
                         }
-
-                        _transaction.addPostTransactionAction(new ServerTransaction.Action()
+                        else
                         {
-                            public void postCommit()
-                            {
-                            }
-
-                            public void onRollback()
-                            {
-                                handle.remove();
-                            }
-                        });
-
-                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
-                        incrementOutstandingTxnsIfNecessary();
-                        _currentMessage.getStoredMessage().flushToStore();
+                            _transaction.enqueue(destinationQueues,
+                                                 amqMessage,
+                                                 new MessageDeliveryAction(amqMessage, destinationQueues));
+                            incrementOutstandingTxnsIfNecessary();
+                            handle.flushToStore();
+
+                        }
                     }
                 }
+                finally
+                {
+                    reference.release();
+                }
+
             }
             finally
             {
                 long bodySize = _currentMessage.getSize();
-                long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp();
+                long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
                 _session.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
@@ -374,9 +373,9 @@ public class AMQChannel implements AMQSe
      * @throws AMQConnectionException if the message is mandatoryclose-on-no-route
      * @see AMQProtocolSession#isCloseWhenNoRoute()
      */
-    private void handleUnroutableMessage() throws AMQConnectionException
+    private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
     {
-        boolean mandatory = _currentMessage.isMandatory();
+        boolean mandatory = message.isMandatory();
         String description = currentMessageDescription();
         boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
 
@@ -398,13 +397,18 @@ public class AMQChannel implements AMQSe
                     (Throwable) null);
         }
 
-        if (mandatory || _currentMessage.isImmediate())
+        if (mandatory || message.isImmediate())
         {
-            _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage));
+            _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
         }
         else
         {
-            _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
+            _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
+                                                       _currentMessage.getMessagePublishInfo().getRoutingKey() == null
+                                                               ? null
+                                                               : _currentMessage.getMessagePublishInfo()
+                                                                       .getRoutingKey()
+                                                                       .toString()));
         }
     }
 
@@ -417,15 +421,17 @@ public class AMQChannel implements AMQSe
 
         return String.format(
                 "[Exchange: %s, Routing key: %s]",
-                _currentMessage.getExchange(),
-                _currentMessage.getRoutingKey());
+                _currentMessage.getExchangeName(),
+                _currentMessage.getMessagePublishInfo().getRoutingKey() == null
+                        ? null
+                        : _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
     }
 
     public void publishContentBody(ContentBody contentBody) throws AMQException
     {
         if (_currentMessage == null)
         {
-            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+            throw new AMQException("Received content body without previously receiving a Content Header");
         }
 
         if (_logger.isDebugEnabled())
@@ -435,10 +441,7 @@ public class AMQChannel implements AMQSe
 
         try
         {
-            final ContentChunk contentChunk =
-                    _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
-
-            _currentMessage.addContentBodyFrame(contentChunk);
+            _currentMessage.addContentBodyFrame(contentBody);
 
             deliverCurrentMessageIfComplete();
         }
@@ -1157,24 +1160,23 @@ public class AMQChannel implements AMQSe
     }
 
 
-    private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+    private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
             throws AMQException
     {
 
-        AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
+        AMQMessage message = new AMQMessage(handle, _session.getReference());
 
-        message.setExpiration(incomingMessage.getExpiration());
-        message.setConnectionIdentifier(_session.getReference());
+        final BasicContentHeaderProperties properties =
+                  incomingMessage.getContentHeader().getProperties();
+
+        long expiration = properties.getExpiration();
+        message.setExpiration(expiration);
         return message;
     }
 
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
-        AMQShortString userID =
-                header.getProperties() instanceof BasicContentHeaderProperties
-                    ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
-                    : null;
-
+        AMQShortString userID = header.getProperties().getUserId();
         return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
 
     }
@@ -1208,13 +1210,13 @@ public class AMQChannel implements AMQSe
 
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
-        private IncomingMessage _incommingMessage;
+        private final MessageReference<AMQMessage> _reference;
         private List<? extends BaseQueue> _destinationQueues;
 
-        public MessageDeliveryAction(IncomingMessage currentMessage,
+        public MessageDeliveryAction(AMQMessage currentMessage,
                                      List<? extends BaseQueue> destinationQueues)
         {
-            _incommingMessage = currentMessage;
+            _reference = currentMessage.newReference();
             _destinationQueues = destinationQueues;
         }
 
@@ -1222,10 +1224,8 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                final boolean immediate = _incommingMessage.isImmediate();
-
-                final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
-                MessageReference ref = amqMessage.newReference();
+                AMQMessage message = _reference.getMessage();
+                final boolean immediate = message.isImmediate();
 
                 for(int i = 0; i < _destinationQueues.size(); i++)
                 {
@@ -1242,7 +1242,7 @@ public class AMQChannel implements AMQSe
                         action = null;
                     }
 
-                    queue.enqueue(amqMessage, isTransactional(), action);
+                    queue.enqueue(message, isTransactional(), action);
 
                     if(queue instanceof AMQQueue)
                     {
@@ -1251,8 +1251,8 @@ public class AMQChannel implements AMQSe
 
                 }
 
-                _incommingMessage.getStoredMessage().flushToStore();
-                ref.release();
+                message.getStoredMessage().flushToStore();
+                _reference.release();
             }
             catch (AMQException e)
             {
@@ -1265,6 +1265,7 @@ public class AMQChannel implements AMQSe
         {
             // Maybe keep track of entries that were created and then delete them here in case of failure
             // to in memory enqueue
+            _reference.release();
         }
 
         private class ImmediateAction implements BaseQueue.PostEnqueueAction
@@ -1375,28 +1376,30 @@ public class AMQChannel implements AMQSe
     private class WriteReturnAction implements ServerTransaction.Action
     {
         private final AMQConstant _errorCode;
-        private final IncomingMessage _message;
         private final String _description;
+        private final MessageReference<AMQMessage> _reference;
 
         public WriteReturnAction(AMQConstant errorCode,
                                  String description,
-                                 IncomingMessage message)
+                                 AMQMessage message)
         {
             _errorCode = errorCode;
-            _message = message;
             _description = description;
+            _reference = message.newReference();
         }
 
         public void postCommit()
         {
             try
             {
-                _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
-                                                              _message.getContentHeader(),
-                                                              _message,
+                AMQMessage message = _reference.getMessage();
+                _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+                                                              message.getContentHeaderBody(),
+                                                              message,
                                                               _channelId,
                                                               _errorCode.getCode(),
                                                               AMQShortString.validValueOf(_description));
+                _reference.release();
             }
             catch (AMQException e)
             {
@@ -1408,6 +1411,7 @@ public class AMQChannel implements AMQSe
 
         public void onRollback()
         {
+            _reference.release();
         }
     }
 
@@ -1470,12 +1474,7 @@ public class AMQChannel implements AMQSe
 
     public boolean onSameConnection(InboundMessage inbound)
     {
-        if(inbound instanceof IncomingMessage)
-        {
-            IncomingMessage incoming = (IncomingMessage) inbound;
-            return getProtocolSession().getReference() == incoming.getConnectionReference();
-        }
-        return false;
+        return getProtocolSession().getReference() == inbound.getConnectionReference();
     }
 
     public int getUnacknowledgedMessageCount()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Wed Jan 22 17:16:44 2014
@@ -28,66 +28,41 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
 
-import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 
 /**
  * A deliverable message.
  */
-public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
+public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
     /** Flag to indicate that this message requires 'immediate' delivery. */
 
-    private static final byte IMMEDIATE = 0x01;
-
-    /**
-     * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
-     * for messages published with the 'immediate' flag.
-     */
-
-    private static final byte DELIVERED_TO_CONSUMER = 0x02;
-
-    private byte _flags = 0;
-
     private long _expiration;
 
     private final long _size;
 
-    private Object _connectionIdentifier;
-    private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
         this(handle, null);
     }
 
-    public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
+    public AMQMessage(StoredMessage<MessageMetaData> handle, Object connectionReference)
     {
-        super(handle);
-
-
-        final MessageMetaData metaData = handle.getMetaData();
-        _size = metaData.getContentSize();
-        final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
-
-        if(messagePublishInfo.isImmediate())
-        {
-            _flags |= IMMEDIATE;
-        }
+        super(handle, connectionReference);
+        _size = handle.getMetaData().getContentSize();
     }
 
     public void setExpiration(final long expiration)
     {
-
         _expiration = expiration;
-
     }
 
     public MessageMetaData getMessageMetaData()
@@ -100,21 +75,6 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getContentHeaderBody();
     }
 
-    public Long getMessageId()
-    {
-        return getStoredMessage().getMessageNumber();
-    }
-
-    /**
-     * Called selectors to determin if the message has already been sent
-     *
-     * @return _deliveredToConsumer
-     */
-    public boolean getDeliveredToConsumer()
-    {
-        return (_flags & DELIVERED_TO_CONSUMER) != 0;
-    }
-
     public String getRoutingKey()
     {
         MessageMetaData messageMetaData = getMessageMetaData();
@@ -134,22 +94,10 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getMessageHeader();
     }
 
-    public boolean isPersistent()
-    {
-        return getMessageMetaData().isPersistent();
-    }
-
-    /**
-     * Called to enforce the 'immediate' flag.
-     *
-     * @returns  true if the message is marked for immediate delivery but has not been marked as delivered
-     *                              to a consumer
-     */
-    public boolean immediateAndNotDelivered()
+    @Override
+    public boolean isRedelivered()
     {
-
-        return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
+        return false;
     }
 
     public MessagePublishInfo getMessagePublishInfo()
@@ -162,90 +110,27 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getArrivalTime();
     }
 
-    /**
-     * Checks to see if the message has expired. If it has the message is dequeued.
-     *
-     * @param queue The queue to check the expiration against. (Currently not used)
-     *
-     * @return true if the message has expire
-     *
-     * @throws AMQException
-     */
-    public boolean expired(AMQQueue queue) throws AMQException
-    {
-
-        if (_expiration != 0L)
-        {
-            long now = System.currentTimeMillis();
-
-            return (now > _expiration);
-        }
-
-        return false;
-    }
-
-    /**
-     * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
-     * And for selector efficiency.
-     */
-    public void setDeliveredToConsumer()
-    {
-        _flags |= DELIVERED_TO_CONSUMER;
-    }
-
     public long getSize()
     {
         return _size;
-
     }
 
     public boolean isImmediate()
     {
-        return (_flags & IMMEDIATE) == IMMEDIATE;
-    }
-
-    public long getExpiration()
-    {
-        return _expiration;
-    }
-
-    public MessageReference newReference()
-    {
-        return new AMQMessageReference(this);
+        return getMessagePublishInfo().isImmediate();
     }
 
-    public long getMessageNumber()
-    {
-        return getStoredMessage().getMessageNumber();
-    }
-
-
-    public Object getConnectionIdentifier()
-    {
-        return _connectionIdentifier;
-
-    }
 
-    public void setConnectionIdentifier(final Object connectionIdentifier)
+    public boolean isMandatory()
     {
-        _connectionIdentifier = connectionIdentifier;
+        return getMessagePublishInfo().isMandatory();
     }
 
 
-    public String toString()
-    {
-        return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
-    }
-
-    public int getContent(ByteBuffer buf, int offset)
+    public long getExpiration()
     {
-        return getStoredMessage().getContent(offset, buf);
+        return _expiration;
     }
 
 
-    public ByteBuffer getContent(int offset, int size)
-    {
-        return getStoredMessage().getContent(offset, size);
-    }
-
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Wed Jan 22 17:16:44 2014
@@ -24,96 +24,44 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
 import org.apache.qpid.server.store.StoredMessage;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
+public class IncomingMessage
 {
 
-    /** Used for debugging purposes. */
-    private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
-
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-
+    private Exchange _exchange;
 
     /**
      * Keeps a track of how many bytes we have received in body frames
      */
     private long _bodyLengthReceived = 0;
+    private List<ContentBody> _contentChunks = new ArrayList<ContentBody>();
 
-    /**
-     * This is stored during routing, to know the queues to which this message should immediately be
-     * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
-     * by the message handle.
-     */
-    private List<? extends BaseQueue> _destinationQueues;
-
-    private long _expiration;
-
-    private Exchange _exchange;
-
-    private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
-
-    // we keep both the original meta data object and the store reference to it just in case the
-    // store would otherwise flow it to disk
-
-    private MessageMetaData _messageMetaData;
-
-    private StoredMessage<MessageMetaData> _storedMessageHandle;
-    private Object _connectionReference;
-
-
-    public IncomingMessage(
-            final MessagePublishInfo info
-    )
-    {
-        this(info, null);
-    }
-
-    public IncomingMessage(MessagePublishInfo info, Object reference)
+    public IncomingMessage(MessagePublishInfo info)
     {
         _messagePublishInfo = info;
-        _connectionReference = reference;
     }
 
-    public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
+    public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody)
     {
         _contentHeaderBody = contentHeaderBody;
     }
 
-    public void setExpiration()
-    {
-        _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
-    }
-
-    public MessageMetaData headersReceived(long currentTime)
-    {
-        _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
-        return _messageMetaData;
-    }
-
-
-    public List<? extends BaseQueue> getDestinationQueues()
+    public MessagePublishInfo getMessagePublishInfo()
     {
-        return _destinationQueues;
+        return _messagePublishInfo;
     }
 
-    public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
+    public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
     {
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
@@ -124,31 +72,14 @@ public class IncomingMessage implements 
         return (_bodyLengthReceived == getContentHeader().getBodySize());
     }
 
-    public AMQShortString getExchange()
+    public AMQShortString getExchangeName()
     {
         return _messagePublishInfo.getExchange();
     }
 
-    public String getRoutingKey()
-    {
-        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
-    }
-
-    public String getBinding()
-    {
-        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
-    }
-
-
-    public boolean isMandatory()
-    {
-        return _messagePublishInfo.isMandatory();
-    }
-
-
-    public boolean isImmediate()
+    public Exchange getExchange()
     {
-        return _messagePublishInfo.isImmediate();
+        return _exchange;
     }
 
     public ContentHeaderBody getContentHeader()
@@ -156,129 +87,24 @@ public class IncomingMessage implements 
         return _contentHeaderBody;
     }
 
-
-    public AMQMessageHeader getMessageHeader()
-    {
-        return _messageMetaData.getMessageHeader();
-    }
-
-    public boolean isPersistent()
-    {
-        return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
-                                                             BasicContentHeaderProperties.PERSISTENT;
-    }
-
-    public boolean isRedelivered()
-    {
-        return false;
-    }
-
-
     public long getSize()
     {
         return getContentHeader().getBodySize();
     }
 
-    public long getMessageNumber()
-    {
-        return _storedMessageHandle.getMessageNumber();
-    }
-
     public void setExchange(final Exchange e)
     {
         _exchange = e;
     }
 
-    public void route()
-    {
-        enqueue(_exchange.route(this));
-
-    }
-
-    public void enqueue(final List<? extends BaseQueue> queues)
-    {
-        _destinationQueues = queues;
-    }
-
-    public MessagePublishInfo getMessagePublishInfo()
-    {
-        return _messagePublishInfo;
-    }
-
-    public long getExpiration()
-    {
-        return _expiration;
-    }
-
     public int getBodyCount() throws AMQException
     {
         return _contentChunks.size();
     }
 
-    public ContentChunk getContentChunk(int index)
+    public ContentBody getContentChunk(int index)
     {
         return _contentChunks.get(index);
     }
 
-
-    public int getContent(ByteBuffer buf, int offset)
-    {
-        int pos = 0;
-        int written = 0;
-        for(ContentChunk cb : _contentChunks)
-        {
-            ByteBuffer data = ByteBuffer.wrap(cb.getData());
-            if(offset+written >= pos && offset < pos + data.limit())
-            {
-                ByteBuffer src = data.duplicate();
-                src.position(offset+written - pos);
-                src = src.slice();
-
-                if(buf.remaining() < src.limit())
-                {
-                    src.limit(buf.remaining());
-                }
-                int count = src.limit();
-                buf.put(src);
-                written += count;
-                if(buf.remaining() == 0)
-                {
-                    break;
-                }
-            }
-            pos+=data.limit();
-        }
-        return written;
-
-    }
-
-
-    public ByteBuffer getContent(int offset, int size)
-    {
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        getContent(buf,offset);
-        buf.flip();
-        return buf;
-    }
-
-    public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
-    {
-        _storedMessageHandle = storedMessageHandle;
-    }
-
-    public StoredMessage<MessageMetaData> getStoredMessage()
-    {
-        return _storedMessageHandle;
-    }
-
-    public Object getConnectionReference()
-    {
-        return _connectionReference;
-    }
-
-    public MessageMetaData getMessageMetaData()
-    {
-        return _messageMetaData;
-    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Wed Jan 22 17:16:44 2014
@@ -46,11 +46,10 @@ import java.util.Set;
  */
 public class MessageMetaData implements StorableMessageMetaData
 {
-    private MessagePublishInfo _messagePublishInfo;
+    private final MessagePublishInfo _messagePublishInfo;
 
-    private ContentHeaderBody _contentHeaderBody;
+    private final ContentHeaderBody _contentHeaderBody;
 
-    private int _contentChunkCount;
 
     private long _arrivalTime;
     private static final byte MANDATORY_FLAG = 1;
@@ -58,59 +57,36 @@ public class MessageMetaData implements 
     public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
     private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8();
 
-    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody)
     {
-        this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
+        this(publishBody,contentHeaderBody, System.currentTimeMillis());
     }
 
-    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+    public MessageMetaData(MessagePublishInfo publishBody,
+                           ContentHeaderBody contentHeaderBody,
+                           long arrivalTime)
     {
         _contentHeaderBody = contentHeaderBody;
         _messagePublishInfo = publishBody;
-        _contentChunkCount = contentChunkCount;
         _arrivalTime = arrivalTime;
     }
 
-    public int getContentChunkCount()
-    {
-        return _contentChunkCount;
-    }
-
-    public void setContentChunkCount(int contentChunkCount)
-    {
-        _contentChunkCount = contentChunkCount;
-    }
 
     public ContentHeaderBody getContentHeaderBody()
     {
         return _contentHeaderBody;
     }
 
-    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
-    {
-        _contentHeaderBody = contentHeaderBody;
-    }
-
     public MessagePublishInfo getMessagePublishInfo()
     {
         return _messagePublishInfo;
     }
 
-    public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
-    {
-        _messagePublishInfo = messagePublishInfo;
-    }
-
     public long getArrivalTime()
     {
         return _arrivalTime;
     }
 
-    public void setArrivalTime(long arrivalTime)
-    {
-        _arrivalTime = arrivalTime;
-    }
-
     public MessageMetaDataType getType()
     {
         return TYPE;
@@ -169,8 +145,7 @@ public class MessageMetaData implements 
 
     public boolean isPersistent()
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
-        return properties.getDeliveryMode() ==  BasicContentHeaderProperties.PERSISTENT;
+        return _contentHeaderBody.getProperties().getDeliveryMode() ==  BasicContentHeaderProperties.PERSISTENT;
     }
 
     private static class MetaDataFactory implements MessageMetaDataType.Factory
@@ -219,7 +194,7 @@ public class MessageMetaData implements 
                                 return routingKey;
                             }
                         };
-                return new MessageMetaData(publishBody, chb, 0, arrivalTime);
+                return new MessageMetaData(publishBody, chb, arrivalTime);
             }
             catch (AMQException e)
             {
@@ -242,7 +217,7 @@ public class MessageMetaData implements 
     {
         private BasicContentHeaderProperties getProperties()
         {
-            return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
+            return getContentHeaderBody().getProperties();
         }
 
         public String getUserId()
@@ -300,18 +275,6 @@ public class MessageMetaData implements 
             return getProperties().getReplyToAsString();
         }
 
-        public String getReplyToExchange()
-        {
-            // TODO
-            return getReplyTo();
-        }
-
-        public String getReplyToRoutingKey()
-        {
-            // TODO
-            return getReplyTo();
-        }
-
         public Object getHeader(String name)
         {
             FieldTable ft = getProperties().getHeaders();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Wed Jan 22 17:16:44 2014
@@ -488,7 +488,7 @@ public abstract class SubscriptionImpl i
             {
                 AMQMessage message = (AMQMessage) entry.getMessage();
 
-                final Object publisherReference = message.getConnectionIdentifier();
+                final Object publisherReference = message.getConnectionReference();
 
                 // We don't want local messages so check to see if message is one we sent
                 Object localReference = getProtocolSession().getReference();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Wed Jan 22 17:16:44 2014
@@ -119,8 +119,6 @@ public class AckTest extends QpidTestCas
                     return new AMQShortString("rk");
                 }
             };
-            final IncomingMessage msg = new IncomingMessage(publishBody);
-            //IncomingMessage msg2 = null;
             BasicContentHeaderProperties b = new BasicContentHeaderProperties();
             ContentHeaderBody cb = new ContentHeaderBody();
             cb.setProperties(b);
@@ -131,42 +129,35 @@ public class AckTest extends QpidTestCas
                 b.setDeliveryMode((byte) 2);
             }
 
-            msg.setContentHeaderBody(cb);
-
             // we increment the reference here since we are not delivering the messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
-            msg.enqueue(qs);
-            MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
+            MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
             final StoredMessage storedMessage = _messageStore.addMessage(mmd);
-            msg.setStoredMessage(storedMessage);
             final AMQMessage message = new AMQMessage(storedMessage);
-            if(msg.allContentReceived())
-            {
-                ServerTransaction txn = new AutoCommitTransaction(_messageStore);
-                txn.enqueue(_queue, message, new ServerTransaction.Action() {
-                    public void postCommit()
+            ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+            txn.enqueue(_queue, message, new ServerTransaction.Action() {
+                public void postCommit()
+                {
+                    try
                     {
-                        try
-                        {
 
-                            _queue.enqueue(message);
-                        }
-                        catch (AMQException e)
-                        {
-                             throw new RuntimeException(e);
-                        }
+                        _queue.enqueue(message);
                     }
-
-                    public void onRollback()
+                    catch (AMQException e)
                     {
-                        //To change body of implemented methods use File | Settings | File Templates.
+                         throw new RuntimeException(e);
                     }
-                });
+                }
+
+                public void onRollback()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+            });
 
-            }
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Wed Jan 22 17:16:44 2014
@@ -57,9 +57,9 @@ public class MockStoredMessage implement
         {
             FieldTable headers = new FieldTable();
             headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
-            ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
+            ( chb.getProperties()).setHeaders(headers);
         }
-        _metaData = new MessageMetaData(info, chb, 0);
+        _metaData = new MessageMetaData(info, chb);
         _content = ByteBuffer.allocate(_metaData.getContentSize());
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Wed Jan 22 17:16:44 2014
@@ -81,7 +81,7 @@ public class ReferenceCountingTest exten
 
 
 
-        MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+        MessageMetaData mmd = new MessageMetaData(info, chb);
         StoredMessage storedMessage = _store.addMessage(mmd);
 
 
@@ -139,7 +139,7 @@ public class ReferenceCountingTest exten
 
         final ContentHeaderBody chb = createPersistentContentHeader();
 
-        MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+        MessageMetaData mmd = new MessageMetaData(info, chb);
         StoredMessage storedMessage = _store.addMessage(mmd);
 
         AMQMessage message = new AMQMessage(storedMessage);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Wed Jan 22 17:16:44 2014
@@ -51,6 +51,8 @@ public class Connection_1_0 implements C
     private final ConnectionEndpoint _conn;
     private final long _connectionId;
     private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+    private final Object _reference = new Object();
+
 
 
     public static interface Task
@@ -79,6 +81,11 @@ public class Connection_1_0 implements C
 
     }
 
+    public Object getReference()
+    {
+        return _reference;
+    }
+
     public void remoteSessionCreation(SessionEndpoint endpoint)
     {
         Session_1_0 session = new Session_1_0(_vhost, this);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Wed Jan 22 17:16:44 2014
@@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.txn.ServerTransaction;
 
@@ -53,34 +54,48 @@ public class ExchangeDestination impleme
 
     public Outcome send(final Message_1_0 message, ServerTransaction txn)
     {
-        final List<? extends BaseQueue> queues = _exchange.route(message);
+        List<? extends BaseQueue> queues = _exchange.route(message);
 
-        txn.enqueue(queues,message, new ServerTransaction.Action()
+        if(queues == null || queues.isEmpty())
         {
+            Exchange altExchange = _exchange.getAlternateExchange();
+            if(altExchange != null)
+            {
+                queues = altExchange.route(message);
+            }
+        }
 
-            BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+        if(queues != null && !queues.isEmpty())
+        {
+            final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
 
-            public void postCommit()
+            txn.enqueue(queues,message, new ServerTransaction.Action()
             {
-                for(int i = 0; i < _queues.length; i++)
+                MessageReference _reference = message.newReference();
+
+                public void postCommit()
                 {
-                    try
-                    {
-                        _queues[i].enqueue(message);
-                    }
-                    catch (AMQException e)
+                    for(int i = 0; i < baseQueues.length; i++)
                     {
-                        // TODO
-                        throw new RuntimeException(e);
+                        try
+                        {
+                            baseQueues[i].enqueue(message);
+                        }
+                        catch (AMQException e)
+                        {
+                            // TODO
+                            throw new RuntimeException(e);
+                        }
                     }
+                    _reference.release();
                 }
-            }
 
-            public void onRollback()
-            {
-                // NO-OP
-            }
-        });
+                public void onRollback()
+                {
+                    _reference.release();
+                }
+            });
+        }
 
         return ACCEPTED;
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Jan 22 17:16:44 2014
@@ -272,7 +272,7 @@ public abstract class MessageConverter_t
                 @Override
                 public void remove()
                 {
-                    serverMessage.getStoredMessage().remove();
+                    throw new UnsupportedOperationException();
                 }
             };
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Wed Jan 22 17:16:44 2014
@@ -504,16 +504,6 @@ public class MessageMetaData_1_0 impleme
             }
         }
 
-        public String getReplyToExchange()
-        {
-            return null;  //TODO
-        }
-
-        public String getReplyToRoutingKey()
-        {
-            return null;  //TODO
-        }
-
         public String getAppId()
         {
             //TODO



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message