qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1295627 [4/12] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbsto...
Date Thu, 01 Mar 2012 15:42:53 GMT
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Thu Mar  1 15:42:44 2012
@@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.s
 
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 
 public class ChannelLogSubject extends AbstractLogSubject
@@ -52,5 +55,33 @@ public class ChannelLogSubject extends A
                                session.getVirtualHost().getName(),
                                channel.getChannelId());
     }
-    
+
+    public ChannelLogSubject(ServerSession session)
+    {
+        /**
+         * LOG FORMAT used by the AMQPConnectorActor follows
+         * ChannelLogSubject.CHANNEL_FORMAT :
+         * con:{0}({1}@{2}/{3})/ch:{4}
+         *
+         * Uses a MessageFormat call to insert the required values according to
+         * these indices:
+         *
+         * 0 - Connection ID
+         * 1 - User ID
+         * 2 - IP
+         * 3 - Virtualhost
+         * 4 - Channel ID
+         */
+        if(session.getConnection() instanceof ServerConnection)
+        {
+            ServerConnection connection = (ServerConnection) session.getConnection();
+            setLogStringWithFormat(CHANNEL_FORMAT,
+                                   connection == null ? -1L : connection.getConnectionId(),
+                                   session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
+                                   (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
+                                   session.getVirtualHost().getName(),
+                                   session.getChannel());
+        }
+    }
+
 }

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:42:44 2012
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Thu Mar  1 15:42:44 2012
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
 /**
  * A deliverable message.
  */
-public class AMQMessage extends AbstractServerMessageImpl
+public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -62,9 +62,7 @@ public class AMQMessage extends Abstract
     private Object _sessionIdentifier;
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
 
-    private final StoredMessage<MessageMetaData> _handle;
-
-    WeakReference<AMQChannel> _channelRef;
+    private WeakReference<AMQChannel> _channelRef;
 
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
@@ -75,7 +73,7 @@ public class AMQMessage extends Abstract
     {
         super(handle);
 
-        _handle = handle;
+
         final MessageMetaData metaData = handle.getMetaData();
         _size = metaData.getContentSize();
         final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
@@ -97,7 +95,7 @@ public class AMQMessage extends Abstract
 
     public MessageMetaData getMessageMetaData()
     {
-        return _handle.getMetaData();
+        return getStoredMessage().getMetaData();
     }
 
     public ContentHeaderBody getContentHeaderBody() throws AMQException
@@ -107,7 +105,7 @@ public class AMQMessage extends Abstract
 
     public Long getMessageId()
     {
-        return _handle.getMessageNumber();
+        return getStoredMessage().getMessageNumber();
     }
 
     /**
@@ -219,9 +217,9 @@ public class AMQMessage extends Abstract
         return new AMQMessageReference(this);
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
-        return getMessageId();
+        return getStoredMessage().getMessageNumber();
     }
 
 
@@ -248,16 +246,13 @@ public class AMQMessage extends Abstract
 
     public int getContent(ByteBuffer buf, int offset)
     {
-        return _handle.getContent(offset, buf);
+        return getStoredMessage().getContent(offset, buf);
     }
 
-    public StoredMessage<MessageMetaData> getStoredMessage()
+
+    public ByteBuffer getContent(int offset, int size)
     {
-        return _handle;
+        return getStoredMessage().getContent(offset, size);
     }
 
-    public SessionConfig getSessionConfig()
-    {
-        return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
-   }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Mar  1 15:42:44 2012
@@ -21,19 +21,30 @@
 package org.apache.qpid.server.message;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 
-public abstract class AbstractServerMessageImpl implements ServerMessage
+public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
 {
-    private final AtomicInteger _referenceCount = new AtomicInteger(0);
-    private final StoredMessage<?> _handle;
 
-    public AbstractServerMessageImpl(StoredMessage<?> handle)
+    private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+
+    private volatile int _referenceCount = 0;
+    private final StoredMessage<T> _handle;
+
+    public AbstractServerMessageImpl(StoredMessage<T> handle)
     {
         _handle = handle;
     }
 
+    public StoredMessage<T> getStoredMessage()
+    {
+        return _handle;
+    }
+
     public boolean incrementReference()
     {
         return incrementReference(1);
@@ -41,9 +52,9 @@ public abstract class AbstractServerMess
 
     public boolean incrementReference(int count)
     {
-        if(_referenceCount.addAndGet(count) <= 0)
+        if(_refCountUpdater.addAndGet(this, count) <= 0)
         {
-            _referenceCount.addAndGet(-count);
+            _refCountUpdater.addAndGet(this, -count);
             return false;
         }
         else
@@ -62,7 +73,7 @@ public abstract class AbstractServerMess
      */
     public void decrementReference()
     {
-        int count = _referenceCount.decrementAndGet();
+        int count = _refCountUpdater.decrementAndGet(this);
 
         // note that the operation of decrementing the reference count and then removing the message does not
         // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -73,7 +84,7 @@ public abstract class AbstractServerMess
             // set the reference count way below 0 so that we can detect that the message has been deleted
             // this is to guard against the message being spontaneously recreated (from the mgmt console)
             // by copying from other queues at the same time as it is being removed.
-            _referenceCount.set(Integer.MIN_VALUE/2);
+            _refCountUpdater.set(this,Integer.MIN_VALUE/2);
 
             // must check if the handle is null since there may be cases where we decide to throw away a message
             // and the handle has not yet been constructed
@@ -99,6 +110,6 @@ public abstract class AbstractServerMess
 
     protected int getReferenceCount()
     {
-        return _referenceCount.get();
+        return _referenceCount;
     }
 }
\ No newline at end of file

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java Thu Mar  1 15:42:44 2012
@@ -20,8 +20,11 @@
 */
 package org.apache.qpid.server.message;
 
+import org.apache.qpid.server.store.StoredMessage;
+
 public interface EnqueableMessage
 {
-    Long getMessageNumber();
+    long getMessageNumber();
     boolean isPersistent();
+    StoredMessage getStoredMessage();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java Thu Mar  1 15:42:44 2012
@@ -22,10 +22,12 @@ package org.apache.qpid.server.message;
 
 
 import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.framing.AMQShortString;
 
 public interface InboundMessage extends Filterable
 {
     String getRoutingKey();
+    AMQShortString getRoutingKeyShortString();
 
     AMQMessageHeader getMessageHeader();
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Thu Mar  1 15:42:44 2012
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 public interface MessageContentSource
 {
     public int getContent(ByteBuffer buf, int offset);
+    public ByteBuffer getContent(int offset, int size);
 
     long getSize();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java Thu Mar  1 15:42:44 2012
@@ -29,8 +29,8 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.MessageMetaDataType;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.util.ByteBufferInputStream;
 import org.apache.qpid.server.util.ByteBufferOutputStream;
+import org.apache.qpid.util.ByteBufferInputStream;
 
 import java.io.*;
 import java.nio.ByteBuffer;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Thu Mar  1 15:42:44 2012
@@ -30,9 +30,12 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBEncoder;
 import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.framing.AMQShortString;
 
 import java.nio.ByteBuffer;
 import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.List;
 
 public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
 {
@@ -42,7 +45,6 @@ public class MessageMetaData_0_10 implem
     private MessageTransferHeader _messageHeader;
     private long _arrivalTime;
     private int _bodySize;
-    private volatile SoftReference<ByteBuffer> _body;
 
     private static final int ENCODER_SIZE = 1 << 10;
 
@@ -53,21 +55,16 @@ public class MessageMetaData_0_10 implem
 
     public MessageMetaData_0_10(MessageTransfer xfr)
     {
-        this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis());
+        this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
     }
 
     private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
     {
-        this(header, bodySize, null, arrivalTime);
-    }
-
-    private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime)
-    {
         _header = header;
         if(_header != null)
         {
-            _deliveryProps = _header.get(DeliveryProperties.class);
-            _messageProps = _header.get(MessageProperties.class);
+            _deliveryProps = _header.getDeliveryProperties();
+            _messageProps = _header.getMessageProperties();
         }
         else
         {
@@ -78,21 +75,6 @@ public class MessageMetaData_0_10 implem
         _arrivalTime = arrivalTime;
         _bodySize = bodySize;
 
-
-
-        if(xfrBody == null)
-        {
-            _body = null;
-        }
-        else
-        {
-            ByteBuffer body = ByteBuffer.allocate(_bodySize);
-            body.put(xfrBody);
-            body.flip();
-            _body = new SoftReference<ByteBuffer>(body);
-        }
-
-
     }
 
 
@@ -122,16 +104,39 @@ public class MessageMetaData_0_10 implem
 
         encoder.writeInt64(_arrivalTime);
         encoder.writeInt32(_bodySize);
-        Struct[] headers = _header == null ? new Struct[0] : _header.getStructs();
-        encoder.writeInt32(headers.length);
+        int headersLength = 0;
+        if(_header.getDeliveryProperties() != null)
+        {
+            headersLength++;
+        }
+        if(_header.getMessageProperties() != null)
+        {
+            headersLength++;
+        }
+        if(_header.getNonStandardProperties() != null)
+        {
+            headersLength += _header.getNonStandardProperties().size();
+        }
 
+        encoder.writeInt32(headersLength);
 
-        for(Struct header : headers)
+        if(_header.getDeliveryProperties() != null)
         {
-            encoder.writeStruct32(header);
-
+            encoder.writeStruct32(_header.getDeliveryProperties());
+        }
+        if(_header.getMessageProperties() != null)
+        {
+            encoder.writeStruct32(_header.getMessageProperties());
         }
+        if(_header.getNonStandardProperties() != null)
+        {
 
+            for(Struct header : _header.getNonStandardProperties())
+            {
+                encoder.writeStruct32(header);
+            }
+
+        }
         ByteBuffer buf = encoder.buffer();
         return buf;
     }
@@ -173,6 +178,11 @@ public class MessageMetaData_0_10 implem
         return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
     }
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return AMQShortString.valueOf(getRoutingKey());
+    }
+
     public AMQMessageHeader getMessageHeader()
     {
         return _messageHeader;
@@ -210,17 +220,6 @@ public class MessageMetaData_0_10 implem
         return _header;
     }
 
-    public ByteBuffer getBody()
-    {
-        ByteBuffer body = _body == null ? null : _body.get();
-        return body;
-    }
-
-    public void setBody(ByteBuffer body)
-    {
-        _body = new SoftReference<ByteBuffer>(body);
-    }
-
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
         public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
@@ -232,14 +231,32 @@ public class MessageMetaData_0_10 implem
             int bodySize = decoder.readInt32();
             int headerCount = decoder.readInt32();
 
-            Struct[] headers = new Struct[headerCount];
+            DeliveryProperties deliveryProperties = null;
+            MessageProperties messageProperties = null;
+            List<Struct> otherProps = null;
 
             for(int i = 0 ; i < headerCount; i++)
             {
-                headers[i] = decoder.readStruct32();
+                Struct struct = decoder.readStruct32();
+                if(struct instanceof DeliveryProperties && deliveryProperties == null)
+                {
+                    deliveryProperties = (DeliveryProperties) struct;
+                }
+                else if(struct instanceof MessageProperties && messageProperties == null)
+                {
+                    messageProperties = (MessageProperties) struct;
+                }
+                else
+                {
+                    if(otherProps == null)
+                    {
+                        otherProps = new ArrayList<Struct>();
+
+                    }
+                    otherProps.add(struct);
+                }
             }
-
-            Header header = new Header(headers);
+            Header header = new Header(deliveryProperties,messageProperties,otherProps);
 
             return new MessageMetaData_0_10(header, bodySize, arrivalTime);
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java Thu Mar  1 15:42:44 2012
@@ -20,25 +20,30 @@
 */
 package org.apache.qpid.server.message;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.qpid.amqp_1_0.codec.ValueHandler;
 import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-
-
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.amqp_1_0.type.messaging.Footer;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.configuration.Validator;
 import org.apache.qpid.server.store.MessageMetaDataType;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 
-import java.nio.ByteBuffer;
-import java.util.*;
-
 public class MessageMetaData_1_0 implements StorableMessageMetaData
 {
     // TODO move to somewhere more useful
@@ -312,6 +317,18 @@ public class MessageMetaData_1_0 impleme
         return buf.limit();
     }
 
+    public int getContentSize()
+    {
+        ByteBuffer buf = _encoded;
+
+        if(buf == null)
+        {
+            buf = encodeAsBuffer();
+            _encoded = buf;
+        }
+        return buf.remaining();
+    }
+
     public boolean isPersistent()
     {
         return _header != null && Boolean.TRUE.equals(_header.getDurable());

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Thu Mar  1 15:42:44 2012
@@ -24,32 +24,35 @@ import org.apache.qpid.transport.*;
 import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
 
 import java.nio.ByteBuffer;
-import java.lang.ref.WeakReference;
 
 
-public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
 {
-    private StoredMessage<MessageMetaData_0_10> _storeMessage;
-    private WeakReference<Session> _sessionRef;
 
-    public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
+    private Object _connectionRef;
+
+    public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
     {
         super(storeMessage);
-        _storeMessage = storeMessage;
-        _sessionRef = sessionRef;
+        _connectionRef = connectionRef;
     }
 
     private MessageMetaData_0_10 getMetaData()
     {
-        return _storeMessage.getMetaData();
+        return getStoredMessage().getMetaData();
     }
 
     public String getRoutingKey()
     {
         return getMetaData().getRoutingKey();
+    }
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return AMQShortString.valueOf(getRoutingKey());
     }
 
     public AMQMessageHeader getMessageHeader()
@@ -91,9 +94,9 @@ public class MessageTransferMessage exte
         return new TransferMessageReference(this);
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
-        return _storeMessage.getMessageNumber();
+        return getStoredMessage().getMessageNumber();
     }
 
     public long getArrivalTime()
@@ -103,7 +106,13 @@ public class MessageTransferMessage exte
 
     public int getContent(ByteBuffer buf, int offset)
     {
-        return _storeMessage.getContent(offset, buf);
+        return getStoredMessage().getContent(offset, buf);
+    }
+
+
+    public ByteBuffer getContent(int offset, int size)
+    {
+        return getStoredMessage().getContent(offset,size);
     }
 
     public Header getHeader()
@@ -113,32 +122,13 @@ public class MessageTransferMessage exte
 
     public ByteBuffer getBody()
     {
-        ByteBuffer body = getMetaData().getBody();
-        if(body == null && getSize() != 0l)
-        {
-            final int size = (int) getSize();
-            int pos = 0;
-            body = ByteBuffer.allocate(size);
-
-            while(pos < size)
-            {
-                pos += getContent(body, pos);
-            }
-
-            body.flip();
 
-            getMetaData().setBody(body.duplicate());
-        }
-        return body;
+        return  getContent(0, (int)getSize());
     }
 
-    public Session getSession()
+    public Object getConnectionReference()
     {
-        return _sessionRef == null ? null : _sessionRef.get();
+        return _connectionRef;
     }
 
-    public SessionConfig getSessionConfig()
-    {
-        return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
-    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Thu Mar  1 15:42:44 2012
@@ -22,14 +22,17 @@ package org.apache.qpid.server.message;
 
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
 
-public interface ServerMessage<T extends ServerMessage> extends EnqueableMessage, MessageContentSource
+public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueableMessage, MessageContentSource
 {
     String getRoutingKey();
 
     AMQMessageHeader getMessageHeader();
 
+    public StoredMessage<T> getStoredMessage();
+
     boolean isPersistent();
 
     long getSize();
@@ -38,13 +41,14 @@ public interface ServerMessage<T extends
 
     long getExpiration();
 
-    MessageReference<T> newReference();
+    MessageReference newReference();
 
-    Long getMessageNumber();
+    long getMessageNumber();
 
     long getArrivalTime();
 
     public int getContent(ByteBuffer buf, int offset);
 
-    SessionConfig getSessionConfig();
+    public ByteBuffer getContent(int offset, int size);
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java Thu Mar  1 15:42:44 2012
@@ -40,8 +40,8 @@ public class HeaderPropertiesConverter
         BasicContentHeaderProperties props = new BasicContentHeaderProperties();
 
         Header header = messageTransferMessage.getHeader();
-        DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
-        MessageProperties messageProps = header.get(MessageProperties.class);
+        DeliveryProperties deliveryProps = header.getDeliveryProperties();
+        MessageProperties messageProps = header.getMessageProperties();
 
         if(deliveryProps != null)
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Thu Mar  1 15:42:44 2012
@@ -1,420 +1,420 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- *   8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
-    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-
-    public static Factory getInstanceFactory()
-    {
-        return new Factory()
-        {
-
-            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
-            {
-                return new ProtocolOutputConverterImpl(session);
-            }
-        };
-    }
-
-
-    private final AMQProtocolSession _protocolSession;
-
-    private ProtocolOutputConverterImpl(AMQProtocolSession session)
-    {
-        _protocolSession = session;
-    }
-
-
-    public AMQProtocolSession getProtocolSession()
-    {
-        return _protocolSession;
-    }
-
-    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
-        writeMessageDelivery(entry, channelId, deliverBody);
-    }
-
-
-    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
-            throws AMQException
-    {
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
-        }
-        else
-        {
-            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
-            ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
-            chb.bodySize = message.getSize();
-            return chb;
-        }
-    }
-
-
-    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
-    }
-
-    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-
-
-        int bodySize = (int) message.getSize();
-
-        if(bodySize == 0)
-        {
-            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
-                                                                             contentHeaderBody);
-
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
-            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
-            int writtenSize = capacity;
-
-            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-            CompositeAMQBodyBlock
-                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            while(writtenSize < bodySize)
-            {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
-                writtenSize += capacity;
-
-                writeFrame(new AMQFrame(channelId, body));
-            }
-        }
-    }
-
-    private class MessageContentSourceBody implements AMQBody
-    {
-        public static final byte TYPE = 3;
-        private int _length;
-        private MessageContentSource _message;
-        private int _offset;
-
-        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
-        {
-            _message = message;
-            _offset = offset;
-            _length = length;
-        }
-
-        public byte getFrameType()
-        {
-            return TYPE;
-        }
-
-        public int getSize()
-        {
-            return _length;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            byte[] data = new byte[_length];
-
-            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
-            buffer.write(data);
-        }
-
-        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
-    {
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
-        return contentHeader;
-    }
-
-
-    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
-        writeMessageDelivery(entry, channelId, deliver);
-    }
-
-
-    private AMQBody createEncodedDeliverBody(QueueEntry entry,
-                                              final long deliveryTag,
-                                              final AMQShortString consumerTag)
-            throws AMQException
-    {
-
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        final AMQBody returnBlock = new AMQBody()
-        {
-
-            public AMQBody _underlyingBody;
-
-            public AMQBody createAMQBody()
-            {
-                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
-                                                              deliveryTag,
-                                                              isRedelivered,
-                                                              exchangeName,
-                                                              routingKey);
-
-
-
-
-
-            }
-
-            public byte getFrameType()
-            {
-                return AMQMethodBody.TYPE;
-            }
-
-            public int getSize()
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                return _underlyingBody.getSize();
-            }
-
-            public void writePayload(DataOutputStream buffer) throws IOException
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                _underlyingBody.writePayload(buffer);
-            }
-
-            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                throws AMQException
-            {
-                throw new AMQException("This block should never be dispatched!");
-            }
-        };
-        return returnBlock;
-    }
-
-    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        BasicGetOkBody getOkBody =
-                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    isRedelivered,
-                                                    exchangeName,
-                                                    routingKey,
-                                                    queueSize);
-
-        return getOkBody;
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolSession().getProtocolMinorVersion();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolSession().getProtocolMajorVersion();
-    }
-
-    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
-                                             int replyCode,
-                                             AMQShortString replyText) throws AMQException
-    {
-
-        BasicReturnBody basicReturnBody =
-                METHOD_REGISTRY.createBasicReturnBody(replyCode,
-                        replyText,
-                        messagePublishInfo.getExchange(),
-                        messagePublishInfo.getRoutingKey());
-
-
-        return basicReturnBody;
-    }
-
-    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-
-        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
-        writeMessageDelivery(message, header, channelId, returnFrame);
-    }
-
-
-    public void writeFrame(AMQDataBlock block)
-    {
-        getProtocolSession().writeFrame(block);
-    }
-
-
-    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
-    {
-
-        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
-        writeFrame(basicCancelOkBody.generateFrame(channelId));
-
-    }
-
-
-    public static final class CompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final AMQBody _contentBody;
-        private final int _channel;
-
-
-        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-            _contentBody = contentBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
-        }
-    }
-
-    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final int _channel;
-
-
-        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ *   8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
+    }
+
+
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+        }
+        else
+        {
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
+            return chb;
+        }
+    }
+
+
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+    }
+
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+
+
+        int bodySize = (int) message.getSize();
+
+        if(bodySize == 0)
+        {
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+            int writtenSize = capacity;
+
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writeFrame(compositeBlock);
+
+            while(writtenSize < bodySize)
+            {
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
+
+                writeFrame(new AMQFrame(channelId, body));
+            }
+        }
+    }
+
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            byte[] data = new byte[_length];
+
+            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+            buffer.write(data);
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
+    }
+
+
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
+
+
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
+            throws AMQException
+    {
+
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        final AMQBody returnBlock = new AMQBody()
+        {
+
+            public AMQBody _underlyingBody;
+
+            public AMQBody createAMQBody()
+            {
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
+
+
+
+
+
+            }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(DataOutput buffer) throws IOException
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
+        };
+        return returnBlock;
+    }
+
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        BasicGetOkBody getOkBody =
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
+                                                    queueSize);
+
+        return getOkBody;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
+    {
+
+        BasicReturnBody basicReturnBody =
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
+                        replyText,
+                        messagePublishInfo.getExchange(),
+                        messagePublishInfo.getRoutingKey());
+
+
+        return basicReturnBody;
+    }
+
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+        writeMessageDelivery(message, header, channelId, returnFrame);
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
+}

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message