qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject [2/2] qpid-broker-j git commit: QPID-7791: Recover metadata into direct memory
Date Tue, 20 Jun 2017 13:20:51 GMT
QPID-7791: Recover metadata into direct memory


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/85abb468
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/85abb468
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/85abb468

Branch: refs/heads/master
Commit: 85abb468e02de9a0e38d23e056997da4730ed271
Parents: 973c1f8
Author: Keith Wall <kwall@apache.org>
Authored: Tue Jun 20 10:02:08 2017 +0100
Committer: Keith Wall <kwall@apache.org>
Committed: Tue Jun 20 13:38:25 2017 +0100

----------------------------------------------------------------------
 .../tuple/MessageMetaDataBinding.java           |  41 ++-
 .../store/berkeleydb/BDBMessageStoreTest.java   |   2 +-
 .../qpid/server/bytebuffer/QpidByteBuffer.java  |  16 ++
 .../server/bytebuffer/QpidByteBufferUtils.java  | 271 ++++++++++++++++++
 .../internal/InternalMessageMetaDataType.java   |   8 +-
 .../qpid/server/plugin/MessageMetaDataType.java |   6 +-
 .../v1/MessageStoreSerializer_v1.java           |   3 +-
 .../server/bytebuffer/QpidByteBufferTest.java   |  16 ++
 .../qpid/server/store/MessageStoreTestCase.java |  18 +-
 .../store/TestMessageMetaDataFactory.java       |   9 +-
 .../server/store/TestMessageMetaDataType.java   |   7 +-
 .../v0_10/MessageMetaDataType_0_10.java         |   8 +-
 .../protocol/v0_10/MessageMetaData_0_10.java    |   9 +-
 .../server/protocol/v0_8/MessageMetaData.java   |  38 ++-
 .../protocol/v0_8/MessageMetaDataType_0_8.java  |   8 +-
 .../v0_8/MessageMetaDataFactoryTest.java        | 135 +++++++++
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |   2 +-
 .../protocol/v1_0/MessageMetaDataType_1_0.java  |   8 +-
 .../protocol/v1_0/MessageMetaData_1_0.java      |  27 +-
 .../qpid/server/protocol/v1_0/Message_1_0.java  |   2 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  |   2 +-
 .../v1_0/codec/ArrayTypeConstructor.java        |   1 +
 .../v1_0/codec/BinaryTypeConstructor.java       |   1 +
 .../protocol/v1_0/codec/BooleanConstructor.java |   1 +
 .../v1_0/codec/ByteTypeConstructor.java         |   1 +
 .../v1_0/codec/CharTypeConstructor.java         |   3 +-
 .../v1_0/codec/CompoundTypeConstructor.java     |   1 +
 .../protocol/v1_0/codec/DecimalConstructor.java |   1 +
 .../v1_0/codec/DoubleTypeConstructor.java       |   1 +
 .../v1_0/codec/FloatTypeConstructor.java        |   1 +
 .../server/protocol/v1_0/codec/FrameWriter.java |   1 +
 .../protocol/v1_0/codec/IntTypeConstructor.java |   1 +
 .../v1_0/codec/LongTypeConstructor.java         |   1 +
 .../v1_0/codec/QpidByteBufferUtils.java         | 274 -------------------
 .../v1_0/codec/ShortTypeConstructor.java        |   1 +
 .../v1_0/codec/SmallIntConstructor.java         |   1 +
 .../v1_0/codec/SmallLongConstructor.java        |   1 +
 .../v1_0/codec/SmallUIntConstructor.java        |   1 +
 .../v1_0/codec/SmallULongConstructor.java       |   1 +
 .../v1_0/codec/StringTypeConstructor.java       |   1 +
 .../v1_0/codec/SymbolTypeConstructor.java       |   1 +
 .../v1_0/codec/TimestampTypeConstructor.java    |   1 +
 .../v1_0/codec/UByteTypeConstructor.java        |   1 +
 .../v1_0/codec/UIntTypeConstructor.java         |   1 +
 .../v1_0/codec/ULongTypeConstructor.java        |   1 +
 .../v1_0/codec/UShortTypeConstructor.java       |   1 +
 .../v1_0/codec/UUIDTypeConstructor.java         |   1 +
 .../protocol/v1_0/codec/ValueHandler.java       |   1 +
 .../v1_0/messaging/SectionDecoderImpl.java      |   2 +-
 .../v1_0/type/messaging/AbstractSection.java    |   2 +-
 .../codec/AmqpValueSectionConstructor.java      |   2 +-
 .../messaging/codec/DataSectionConstructor.java |   2 +-
 .../codec/DescribedListSectionConstructor.java  |   2 +-
 .../codec/DescribedMapSectionConstructor.java   |   2 +-
 .../store/jdbc/AbstractJDBCMessageStore.java    |  49 ++--
 .../server/store/jdbc/JDBCMessageStoreTest.java |  14 +
 56 files changed, 658 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
index 81806d5..798b097 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
@@ -20,23 +20,25 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuple;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+
 import com.sleepycat.bind.EntryBinding;
 import com.sleepycat.je.DatabaseEntry;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.store.MessageMetaDataTypeRegistry;
 import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
 
 /**
  * Handles the mapping to and from message meta data
  */
 public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaData>
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(MessageMetaDataBinding.class);
 
     private static final MessageMetaDataBinding INSTANCE = new MessageMetaDataBinding();
 
@@ -51,15 +53,28 @@ public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaD
     @Override
     public StorableMessageMetaData entryToObject(DatabaseEntry entry)
     {
-        QpidByteBuffer buf = QpidByteBuffer.wrap(entry.getData(), entry.getOffset(), entry.getSize());
-        final int bodySize = buf.getInt() ^ 0x80000000;
-        final int metaDataType = buf.get() & 0xff;
-        buf = buf.slice();
-        buf.limit(bodySize-1);
-        MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(metaDataType);
-        final StorableMessageMetaData metaData = type.createMetaData(buf);
-        buf.dispose();
-        return metaData;
+        try(DataInputStream stream = new DataInputStream(new ByteArrayInputStream(entry.getData(),
+                                                                                  entry.getOffset(),
+                                                                                  entry.getSize())))
+        {
+            final int bodySize = stream.readInt() ^ 0x80000000;
+            final int metaDataType = stream.readByte() & 0xff;
+            MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(metaDataType);
+
+            List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream);
+
+            final StorableMessageMetaData metaData = type.createMetaData(bufs);
+
+            for (final QpidByteBuffer buf : bufs)
+            {
+                buf.dispose();
+            }
+            return metaData;
+        }
+        catch (IOException e)
+        {
+            throw new StoreException(String.format("Unable to convert entry %s to metadata", entry));
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index aa1c45d..da45856 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -66,7 +66,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
     private String _storeLocation;
 
     @Override
-    protected void tearDown() throws Exception
+    public void tearDown() throws Exception
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index b335adc..346e203 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -676,6 +676,22 @@ public class QpidByteBuffer implements AutoCloseable
         }
     }
 
+    public static List<QpidByteBuffer> asQpidByteBuffers(final InputStream stream) throws IOException
+    {
+        List<QpidByteBuffer> bufs = new ArrayList<>();
+        byte[] transferBuf = new byte[QpidByteBuffer.getPooledBufferSize()];
+        int read = stream.read(transferBuf);
+        while(read > 0)
+        {
+            QpidByteBuffer chunk = QpidByteBuffer.allocateDirect(read);
+            chunk.put(transferBuf, 0, read);
+            chunk.flip();
+            bufs.add(chunk);
+            read = stream.read(transferBuf);
+        }
+        return bufs;
+    }
+
     public static SSLEngineResult encryptSSL(SSLEngine engine,
                                              final Collection<QpidByteBuffer> buffers,
                                              QpidByteBuffer dest) throws SSLException

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
new file mode 100644
index 0000000..57552a8
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.bytebuffer;
+
+import java.nio.BufferUnderflowException;
+import java.util.List;
+
+public class QpidByteBufferUtils
+{
+    public static boolean hasRemaining(List<QpidByteBuffer> in)
+    {
+        if (in.isEmpty())
+        {
+            return false;
+        }
+        for (int i = 0; i < in.size(); i++)
+        {
+            if (in.get(i).hasRemaining())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static long remaining(List<QpidByteBuffer> in)
+    {
+        long remaining = 0L;
+        for (int i = 0; i < in.size(); i++)
+        {
+            remaining += in.get(i).remaining();
+        }
+        return remaining;
+    }
+
+    public static byte get(List<QpidByteBuffer> in)
+    {
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            if (buffer.hasRemaining())
+            {
+                return buffer.get();
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
+    {
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (remaining >= len)
+            {
+                return true;
+            }
+            len -= remaining;
+        }
+
+        return false;
+    }
+
+    public static long getLong(final List<QpidByteBuffer> in)
+    {
+        boolean bytewise = false;
+        int consumed = 0;
+        long result = 0L;
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < 8)
+                {
+                    result <<= 1;
+                    result |= (0xFF & buffer.get());
+                    consumed++;
+                }
+                if (consumed == 8)
+                {
+                    return result;
+                }
+            }
+            else
+            {
+                if (remaining >= 8)
+                {
+                    return buffer.getLong();
+                }
+                else if (remaining != 0)
+                {
+                    bytewise = true;
+                    while (buffer.hasRemaining())
+                    {
+                        result <<= 1;
+                        result |= (0xFF & buffer.get());
+                        consumed++;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static int getInt(final List<QpidByteBuffer> in)
+    {
+        boolean bytewise = false;
+        int consumed = 0;
+        int result = 0;
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < 4)
+                {
+                    result <<= 1;
+                    result |= (0xFF & buffer.get());
+                    consumed++;
+                }
+                if (consumed == 4)
+                {
+                    return result;
+                }
+            }
+            else
+            {
+                if (remaining >= 4)
+                {
+                    return buffer.getInt();
+                }
+                else if (remaining != 0)
+                {
+                    bytewise = true;
+                    while (buffer.hasRemaining())
+                    {
+                        result <<= 1;
+                        result |= (0xFF & buffer.get());
+                        consumed++;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static float getFloat(final List<QpidByteBuffer> in)
+    {
+        return Float.intBitsToFloat(getInt(in));
+    }
+
+    public static double getDouble(final List<QpidByteBuffer> in)
+    {
+        return Double.longBitsToDouble(getLong(in));
+    }
+
+    public static Short getShort(final List<QpidByteBuffer> in)
+    {
+        boolean bytewise = false;
+        int consumed = 0;
+        short result = 0;
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < 2)
+                {
+                    result <<= 1;
+                    result |= (0xFF & buffer.get());
+                    consumed++;
+                }
+                if (consumed == 2)
+                {
+                    return result;
+                }
+            }
+            else
+            {
+                if (remaining >= 2)
+                {
+                    return buffer.getShort();
+                }
+                else if (remaining != 0)
+                {
+                    bytewise = true;
+                    while (buffer.hasRemaining())
+                    {
+                        result <<= 1;
+                        result |= (0xFF & buffer.get());
+                        consumed++;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static int get(final List<QpidByteBuffer> in, final byte[] data)
+    {
+        int copied = 0;
+        int i = 0;
+        while (copied < data.length && i < in.size())
+        {
+            QpidByteBuffer buf = in.get(i);
+            if (buf.hasRemaining())
+            {
+                int remaining = buf.remaining();
+                if (remaining >= data.length - copied)
+                {
+                    buf.get(data, copied, data.length - copied);
+                    return data.length;
+                }
+                else
+                {
+                    buf.get(data, copied, remaining);
+                    copied += remaining;
+                }
+            }
+            i++;
+        }
+        return copied;
+    }
+
+    public static void skip(final List<QpidByteBuffer> in, int length)
+    {
+        int skipped = 0;
+        int i = 0;
+        while (skipped < length && i < in.size())
+        {
+            QpidByteBuffer buf = in.get(i);
+            if (buf.hasRemaining())
+            {
+                int remaining = buf.remaining();
+                if (remaining >= length - skipped)
+                {
+                    buf.position(buf.position() + length - skipped);
+                    return;
+                }
+                else
+                {
+                    buf.position(buf.position() + remaining);
+                    skipped += remaining;
+                }
+            }
+            i++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
index 92eaaaa..507837c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
@@ -22,8 +22,10 @@ package org.apache.qpid.server.message.internal;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.plugin.PluggableService;
@@ -43,11 +45,9 @@ public class InternalMessageMetaDataType implements MessageMetaDataType<Internal
     }
 
     @Override
-    public InternalMessageMetaData createMetaData(final QpidByteBuffer buf)
+    public InternalMessageMetaData createMetaData(final List<QpidByteBuffer> bufs)
     {
-
-
-        try(ObjectInputStream is = new ObjectInputStream(buf.asInputStream()))
+        try(ObjectInputStream is = new ObjectInputStream(new QpidByteBufferInputStream(bufs)))
         {
             int contentSize = is.readInt();
             InternalMessageHeader header = (InternalMessageHeader) is.readObject();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
index b65e4b9..4821914 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.plugin;
 
+import java.util.List;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -30,12 +32,12 @@ public interface MessageMetaDataType<M extends StorableMessageMetaData> extends
 
     interface Factory<M extends StorableMessageMetaData>
     {
-        M createMetaData(QpidByteBuffer buf);
+        M createMetaData(List<QpidByteBuffer> buf);
     }
 
     int ordinal();
 
-    M createMetaData(QpidByteBuffer buf);
+    M createMetaData(List<QpidByteBuffer> bufs);
 
     ServerMessage<M> createMessage(StoredMessage<M> msg);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index b064247..9f0098a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.store.serializer.v1;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -307,7 +308,7 @@ public class MessageStoreSerializer_v1 implements MessageStoreSerializer
                     MessageMetaDataTypeRegistry.fromOrdinal(metaData[0] & 0xff);
             QpidByteBuffer buf = QpidByteBuffer.wrap(metaData, 1, metaData.length - 1);
             final StorableMessageMetaData storableMessageMetaData =
-                    metaDataType.createMetaData(buf);
+                    metaDataType.createMetaData(Collections.singletonList(buf));
             buf.dispose();
             final MessageHandle<StorableMessageMetaData> handle =
                     store.addMessage(storableMessageMetaData);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
index 139a4e9..d98c4f1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.qpid.server.bytebuffer;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,6 +34,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.io.ByteStreams;
 import org.junit.Assert;
@@ -899,6 +902,19 @@ public class QpidByteBufferTest extends QpidTestCase
         assertTrue("Buffer should be sparse", grandChild.isSparse());
     }
 
+    public void testAsQpidByteBuffers() throws IOException
+    {
+        byte[] dataForTwoBufs = "01234567890".getBytes(StandardCharsets.US_ASCII);
+        Collection<QpidByteBuffer> qpidByteBuffers = QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(dataForTwoBufs));
+        assertEquals("Unexpected number of bufs", 2, qpidByteBuffers.size());
+        Iterator<QpidByteBuffer> itr = qpidByteBuffers.iterator();
+        assertEquals("Unexpected remaining in first buf", 10, itr.next().remaining());
+        assertEquals("Unexpected remaining in second buf", 1, itr.next().remaining());
+
+        Collection<QpidByteBuffer> bufsForEmptyBytes = QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(new byte[]{}));
+        assertEquals("Unexpected number of bufs for empty buffer", 0, bufsForEmptyBytes.size());
+    }
+
     private void doDeflateInflate(byte[] input,
                                   Collection<QpidByteBuffer> inputBufs,
                                   boolean direct) throws IOException

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 82c03c7..ee8def1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.hamcrest.Description;
 import org.mockito.ArgumentMatcher;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -52,7 +53,12 @@ public abstract class MessageStoreTestCase extends QpidTestCase
     private MessageStore _store;
     private ConfiguredObject<?> _parent;
     private MessageStore.MessageStoreReader _storeReader;
+    private static final int BUFFER_SIZE = 10;
+    private static final int POOL_SIZE = 20;
+    private static final double SPARSITY_FRACTION = 1.0;
 
+
+    @Override
     public void setUp() throws Exception
     {
         super.setUp();
@@ -63,6 +69,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase
 
         _store.openMessageStore(_parent);
         _storeReader = _store.newMessageStoreReader();
+
+        QpidByteBuffer.deinitialisePool();
+        QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
+    }
+
+    @Override
+    public void tearDown() throws Exception
+    {
+        QpidByteBuffer.deinitialisePool();
+        super.tearDown();
     }
 
     protected abstract VirtualHost createVirtualHost();
@@ -383,7 +399,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
 
         StoredMessage<?> retrievedMessage = retrievedMessageRef.get();
         assertNotNull("Message was not found", retrievedMessageRef);
-        assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
+        assertEquals("Unexpected retrieved message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
 
         retrievedMessage.remove();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
index 89beed5..381718d 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
@@ -20,15 +20,18 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.List;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 
 public class TestMessageMetaDataFactory implements MessageMetaDataType.Factory<TestMessageMetaData>
 {
-    public TestMessageMetaData createMetaData(QpidByteBuffer buf)
+    public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs)
     {
-        long id = buf.getLong();
-        int size = buf.getInt();
+        long id = QpidByteBufferUtils.getLong(bufs);
+        int size = QpidByteBufferUtils.getInt(bufs);
 
         return new TestMessageMetaData(id, size);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 24fa263..52d6e0f 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -41,9 +42,9 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
     }
 
     @Override
-    public TestMessageMetaData createMetaData(QpidByteBuffer buf)
+    public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs)
     {
-        return TestMessageMetaData.FACTORY.createMetaData(buf);
+        return TestMessageMetaData.FACTORY.createMetaData(bufs);
     }
 
     @Override
@@ -52,11 +53,13 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
         return new TestServerMessage(msg);
     }
 
+    @Override
     public int hashCode()
     {
         return ordinal();
     }
 
+    @Override
     public boolean equals(Object o)
     {
         return o != null && o.getClass() == getClass();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
index 1bfbb2c..2dcb5f5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.util.List;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -40,9 +42,9 @@ public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMeta
     }
 
     @Override
-    public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
+    public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> bufs)
     {
-        return MessageMetaData_0_10.FACTORY.createMetaData(buf);
+        return MessageMetaData_0_10.FACTORY.createMetaData(bufs);
     }
 
     @Override
@@ -51,11 +53,13 @@ public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMeta
         return new MessageTransferMessage(msg, null);
     }
 
+    @Override
     public int hashCode()
     {
         return ordinal();
     }
 
+    @Override
     public boolean equals(Object o)
     {
         return o != null && o.getClass() == getClass();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 065cc0d..70bcdeb 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -21,20 +21,19 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
-import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
 import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 
 public class MessageMetaData_0_10 implements StorableMessageMetaData
 {
@@ -246,9 +245,9 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
 
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
-        public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
+        public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> buf)
         {
-            ServerDecoder decoder = new ServerDecoder(Collections.singletonList(buf));
+            ServerDecoder decoder = new ServerDecoder(buf);
 
             long arrivalTime = decoder.readInt64();
             int bodySize = decoder.readInt32();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 108f52c..1c14a70 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -21,9 +21,11 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQProtocolVersionException;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
@@ -149,12 +151,37 @@ public class MessageMetaData implements StorableMessageMetaData
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
     {
 
-
-        public MessageMetaData createMetaData(QpidByteBuffer buf)
+        @Override
+        public MessageMetaData createMetaData(List<QpidByteBuffer> bufs)
         {
             try
             {
-                int size = buf.getInt();
+                final int size = QpidByteBufferUtils.getInt(bufs);
+
+                final QpidByteBuffer buf;
+                final boolean disposalRequired;
+                if (bufs.size() == 1)
+                {
+                    buf = bufs.get(0);
+                    disposalRequired = false;
+                }
+                else
+                {
+                    // This should seldom happen.  For AMQP 0-8..0-91 the content header body must
+                    // fit within one frame.  If we get here we are either recovering after a reduction
+                    // in framesize or it so happens that the content header and the size/exchange/routingkey
+                    // just overfills one frame.
+                    int totalRemaining = (int) QpidByteBufferUtils.remaining(bufs);
+
+                    buf = QpidByteBuffer.allocateDirect(totalRemaining);
+                    disposalRequired = true;
+                    for (final QpidByteBuffer qpidByteBuffer : bufs)
+                    {
+                        buf.put(qpidByteBuffer);
+                    }
+                    buf.flip();
+                }
+
                 ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, size);
                 final AMQShortString exchange = AMQShortString.readAMQShortString(buf);
                 final AMQShortString routingKey = AMQShortString.readAMQShortString(buf);
@@ -168,6 +195,11 @@ public class MessageMetaData implements StorableMessageMetaData
                                                (flags & MANDATORY_FLAG) != 0,
                                                routingKey);
 
+                if (disposalRequired)
+                {
+                    buf.dispose();
+                }
+
                 return new MessageMetaData(publishBody, chb, arrivalTime);
             }
             catch (AMQFrameDecodingException | AMQProtocolVersionException e)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
index 76f4263..4218317 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.List;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -40,9 +42,9 @@ public class MessageMetaDataType_0_8 implements MessageMetaDataType<MessageMetaD
     }
 
     @Override
-    public MessageMetaData createMetaData(QpidByteBuffer buf)
+    public MessageMetaData createMetaData(List<QpidByteBuffer> bufs)
     {
-        return MessageMetaData.FACTORY.createMetaData(buf);
+        return MessageMetaData.FACTORY.createMetaData(bufs);
     }
 
     @Override
@@ -51,11 +53,13 @@ public class MessageMetaDataType_0_8 implements MessageMetaDataType<MessageMetaD
         return new AMQMessage(msg);
     }
 
+    @Override
     public int hashCode()
     {
         return ordinal();
     }
 
+    @Override
     public boolean equals(Object o)
     {
         return o != null && o.getClass() == getClass();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java
new file mode 100644
index 0000000..5d36778
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataFactoryTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageMetaDataFactoryTest extends QpidTestCase
+{
+    private static final String CONTENT_TYPE = "content/type";
+    private final long _arrivalTime = System.currentTimeMillis();
+    private final AMQShortString _routingKey = AMQShortString.valueOf("routingkey");
+    private final AMQShortString _exchange = AMQShortString.valueOf("exch");
+    private MessageMetaData _mmd;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mmd = createTestMessageMetaData();
+    }
+
+    @Override
+    public void tearDown() throws Exception
+    {
+        super.tearDown();
+        if (_mmd != null)
+        {
+            _mmd.dispose();
+        }
+    }
+
+    public void testUnmarshalFromSingleBuffer() throws Exception
+    {
+        try(QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocateDirect(_mmd.getStorableSize()))
+        {
+            _mmd.writeToBuffer(qpidByteBuffer);
+            qpidByteBuffer.flip();
+
+            MessageMetaData recreated = MessageMetaData.FACTORY.createMetaData(Collections.singletonList(qpidByteBuffer));
+
+            assertEquals("Unexpected arrival time", _arrivalTime, recreated.getArrivalTime());
+            assertEquals("Unexpected routing key", _routingKey, recreated.getMessagePublishInfo().getRoutingKey());
+            assertEquals("Unexpected content type", CONTENT_TYPE, recreated.getContentHeaderBody().getProperties()
+                                                                               .getContentTypeAsString());
+            recreated.dispose();
+        }
+    }
+
+    public void testUnmarshalFromMultipleBuffers() throws Exception
+    {
+        List<QpidByteBuffer> bufs = Collections.emptyList();
+        try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocateDirect(_mmd.getStorableSize()))
+        {
+            _mmd.writeToBuffer(qpidByteBuffer);
+            qpidByteBuffer.flip();
+
+            bufs = splitIntoSmallerBuffers(qpidByteBuffer);
+
+            final MessageMetaData recreated = MessageMetaData.FACTORY.createMetaData(bufs);
+            assertEquals("Unexpected arrival time", _arrivalTime, recreated.getArrivalTime());
+            assertEquals("Unexpected routing key", _routingKey, recreated.getMessagePublishInfo().getRoutingKey());
+            assertEquals("Unexpected content type", CONTENT_TYPE, recreated.getContentHeaderBody().getProperties().getContentTypeAsString());
+            recreated.dispose();
+        }
+        finally
+        {
+            for (final QpidByteBuffer buf : bufs)
+            {
+                buf.dispose();
+            }
+        }
+    }
+
+    private MessageMetaData createTestMessageMetaData()
+    {
+        final MessagePublishInfo publishBody = new MessagePublishInfo(_exchange,
+                                                                      false,
+                                                                      false,
+                                                                      _routingKey);
+        final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        props.setContentType(CONTENT_TYPE);
+        final ContentHeaderBody contentHeaderBody = new ContentHeaderBody(props);
+
+        return new MessageMetaData(publishBody, contentHeaderBody, _arrivalTime);
+    }
+
+    private List<QpidByteBuffer> splitIntoSmallerBuffers(final QpidByteBuffer qpidByteBuffer) throws IOException
+    {
+        List<QpidByteBuffer> bufs = new ArrayList<>();
+        try (InputStream stream = new QpidByteBufferInputStream(Collections.singletonList(qpidByteBuffer)))
+        {
+            byte[] transferBuf = new byte[3];
+            int read = stream.read(transferBuf);
+            while (read != -1)
+            {
+                QpidByteBuffer buf = QpidByteBuffer.allocateDirect(read);
+                buf.put(transferBuf, 0, read);
+                buf.flip();
+                bufs.add(buf);
+                read = stream.read(transferBuf);
+            }
+        }
+        return bufs;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 870a961..2479e85 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -71,7 +71,7 @@ import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
 import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
index 6173543..7db8bcd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.List;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -40,9 +42,9 @@ public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaD
     }
 
     @Override
-    public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
+    public MessageMetaData_1_0 createMetaData(List<QpidByteBuffer> bufs)
     {
-        return MessageMetaData_1_0.FACTORY.createMetaData(buf);
+        return MessageMetaData_1_0.FACTORY.createMetaData(bufs);
     }
 
     @Override
@@ -51,11 +53,13 @@ public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaD
         return new Message_1_0(msg);
     }
 
+    @Override
     public int hashCode()
     {
         return ordinal();
     }
 
+    @Override
     public boolean equals(Object o)
     {
         return o != null && o.getClass() == getClass();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index a599b2a..b3e544d 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -410,19 +411,33 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
             _typeRegistry.registerSecurityLayer();
         }
 
-        public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
+        public MessageMetaData_1_0 createMetaData(List<QpidByteBuffer> bufs)
         {
             try
             {
-                byte versionByte = buf.get(buf.position());
+                if (!QpidByteBufferUtils.hasRemaining(bufs))
+                {
+                    throw new ConnectionScopedRuntimeException("No metadata found");
+                }
+
+                byte versionByte = 0;
+                for (final QpidByteBuffer buf : bufs)
+                {
+                    if (buf.hasRemaining())
+                    {
+                        versionByte = buf.get(buf.position());
+                        break;
+                    }
+                }
+
                 long arrivalTime;
                 long contentSize = 0;
                 if (versionByte == 1)
                 {
                     // we can discard the first byte
-                    buf.get();
-                    arrivalTime = buf.getLong();
-                    contentSize = buf.getLong();
+                    QpidByteBufferUtils.get(bufs);
+                    arrivalTime = QpidByteBufferUtils.getLong(bufs);
+                    contentSize = QpidByteBufferUtils.getLong(bufs);
                 }
                 else if (versionByte == 0)
                 {
@@ -437,7 +452,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
 
                 SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
 
-                List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(Collections.singletonList(buf));
+                List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(bufs);
 
                 if (versionByte == 0)
                 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index c29fa09..1e61657 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -29,7 +29,7 @@ import java.util.List;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index e2044e6..15788ef 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -73,7 +73,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
index 4683709..79f0d39 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
index b66843a..486bea0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
index e204adc..1d7310d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
index 4232fa6..2fa9e41 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
index e838d9f..6cce923 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.*;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -43,7 +44,7 @@ public class CharTypeConstructor implements TypeConstructor<String>
     @Override
     public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(QpidByteBufferUtils.hasRemaining(in,4))
+        if(QpidByteBufferUtils.hasRemaining(in, 4))
         {
             int codePoint = QpidByteBufferUtils.getInt(in);
             char[] chars = Character.toChars(codePoint);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
index cfb952b..1d4f3f8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
index fe7ef10..d46f4aa 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
@@ -23,6 +23,7 @@ import java.math.BigDecimal;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
index 1eb7dcc..aba2d8c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
index f1e6ce5..66be0d3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
index fa30d85..58350e6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.transport.ByteBufferSender;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
index b5439e5..69f3a25 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
index aacb469..2b44607 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
deleted file mode 100644
index 0744a1b..0000000
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0.codec;
-
-import java.nio.BufferUnderflowException;
-import java.util.List;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-
-public class QpidByteBufferUtils
-{
-    public static boolean hasRemaining(List<QpidByteBuffer> in)
-    {
-        if (in.isEmpty())
-        {
-            return false;
-        }
-        for (int i = 0; i < in.size(); i++)
-        {
-            if (in.get(i).hasRemaining())
-            {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static long remaining(List<QpidByteBuffer> in)
-    {
-        long remaining = 0L;
-        for (int i = 0; i < in.size(); i++)
-        {
-            remaining += in.get(i).remaining();
-        }
-        return remaining;
-    }
-
-    public static byte get(List<QpidByteBuffer> in)
-    {
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            if (buffer.hasRemaining())
-            {
-                return buffer.get();
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
-    {
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (remaining >= len)
-            {
-                return true;
-            }
-            len -= remaining;
-        }
-
-        return false;
-    }
-
-    public static long getLong(final List<QpidByteBuffer> in)
-    {
-        boolean bytewise = false;
-        int consumed = 0;
-        long result = 0L;
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (bytewise)
-            {
-                while (buffer.hasRemaining() && consumed < 8)
-                {
-                    result <<= 1;
-                    result |= (0xFF & buffer.get());
-                    consumed++;
-                }
-                if (consumed == 8)
-                {
-                    return result;
-                }
-            }
-            else
-            {
-                if (remaining >= 8)
-                {
-                    return buffer.getLong();
-                }
-                else if (remaining != 0)
-                {
-                    bytewise = true;
-                    while (buffer.hasRemaining())
-                    {
-                        result <<= 1;
-                        result |= (0xFF & buffer.get());
-                        consumed++;
-                    }
-                }
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static int getInt(final List<QpidByteBuffer> in)
-    {
-        boolean bytewise = false;
-        int consumed = 0;
-        int result = 0;
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (bytewise)
-            {
-                while (buffer.hasRemaining() && consumed < 4)
-                {
-                    result <<= 1;
-                    result |= (0xFF & buffer.get());
-                    consumed++;
-                }
-                if (consumed == 4)
-                {
-                    return result;
-                }
-            }
-            else
-            {
-                if (remaining >= 4)
-                {
-                    return buffer.getInt();
-                }
-                else if (remaining != 0)
-                {
-                    bytewise = true;
-                    while (buffer.hasRemaining())
-                    {
-                        result <<= 1;
-                        result |= (0xFF & buffer.get());
-                        consumed++;
-                    }
-                }
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static float getFloat(final List<QpidByteBuffer> in)
-    {
-        return Float.intBitsToFloat(getInt(in));
-    }
-
-    public static double getDouble(final List<QpidByteBuffer> in)
-    {
-        return Double.longBitsToDouble(getLong(in));
-    }
-
-    public static Short getShort(final List<QpidByteBuffer> in)
-    {
-        boolean bytewise = false;
-        int consumed = 0;
-        short result = 0;
-        for (int i = 0; i < in.size(); i++)
-        {
-            final QpidByteBuffer buffer = in.get(i);
-            int remaining = buffer.remaining();
-            if (bytewise)
-            {
-                while (buffer.hasRemaining() && consumed < 2)
-                {
-                    result <<= 1;
-                    result |= (0xFF & buffer.get());
-                    consumed++;
-                }
-                if (consumed == 2)
-                {
-                    return result;
-                }
-            }
-            else
-            {
-                if (remaining >= 2)
-                {
-                    return buffer.getShort();
-                }
-                else if (remaining != 0)
-                {
-                    bytewise = true;
-                    while (buffer.hasRemaining())
-                    {
-                        result <<= 1;
-                        result |= (0xFF & buffer.get());
-                        consumed++;
-                    }
-                }
-            }
-        }
-        throw new BufferUnderflowException();
-    }
-
-    public static int get(final List<QpidByteBuffer> in, final byte[] data)
-    {
-        int copied = 0;
-        int i = 0;
-        while (copied < data.length && i < in.size())
-        {
-            QpidByteBuffer buf = in.get(i);
-            if (buf.hasRemaining())
-            {
-                int remaining = buf.remaining();
-                if (remaining >= data.length - copied)
-                {
-                    buf.get(data, copied, data.length - copied);
-                    return data.length;
-                }
-                else
-                {
-                    buf.get(data, copied, remaining);
-                    copied += remaining;
-                }
-            }
-            i++;
-        }
-        return copied;
-    }
-
-    public static void skip(final List<QpidByteBuffer> in, int length)
-    {
-        int skipped = 0;
-        int i = 0;
-        while (skipped < length && i < in.size())
-        {
-            QpidByteBuffer buf = in.get(i);
-            if (buf.hasRemaining())
-            {
-                int remaining = buf.remaining();
-                if (remaining >= length - skipped)
-                {
-                    buf.position(buf.position() + length - skipped);
-                    return;
-                }
-                else
-                {
-                    buf.position(buf.position() + remaining);
-                    skipped += remaining;
-                }
-            }
-            i++;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
index 528979e..396911a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
index d6b3ca4..5098b34 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
index 1188521..0bf79d7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
index d3f6ae0..32df052 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
index 2a3790a..845f745 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
index 324d06b..3a93f29 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
@@ -26,6 +26,7 @@ import java.nio.CharBuffer;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
index 0ad2319..07a7aac 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SymbolTypeConstructor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
index 49a5db3..a76ae6f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/TimestampTypeConstructor.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85abb468/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
index 792c412..d03668c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/UByteTypeConstructor.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0.codec;
 
 import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message