qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1693123 [1/2] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgra...
Date Tue, 28 Jul 2015 15:48:31 GMT
Author: kwall
Date: Tue Jul 28 15:48:30 2015
New Revision: 1693123

URL: http://svn.apache.org/r1693123
Log:
QPID-6662: [Java Broker] Use direct byte buffers between transport / store and minimise copying

Added:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java
Removed:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteBufferWriter.java
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
    qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Jul 28 15:48:30 2015
@@ -60,7 +60,7 @@ import org.apache.qpid.server.store.Tran
 import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
 import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
@@ -351,7 +351,7 @@ public abstract class AbstractBDBMessage
         DatabaseEntry contentKeyEntry = new DatabaseEntry();
         LongBinding.longToEntry(messageId, contentKeyEntry);
         DatabaseEntry value = new DatabaseEntry();
-        ContentBinding contentTupleBinding = ContentBinding.getInstance();
+        ByteBufferBinding contentTupleBinding = ByteBufferBinding.getInstance();
 
 
         getLogger().debug("Message Id: {} Getting content body from offset: {}", messageId, offset);
@@ -364,8 +364,8 @@ public abstract class AbstractBDBMessage
             OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
             if (status == OperationStatus.SUCCESS)
             {
-                byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
-                int size = dataAsBytes.length;
+                ByteBuffer dataAsBytes = contentTupleBinding.entryToObject(value);
+                int size = dataAsBytes.remaining();
                 if (offset > size)
                 {
                     throw new RuntimeException("Offset " + offset + " is greater than message size " + size
@@ -378,8 +378,11 @@ public abstract class AbstractBDBMessage
                 {
                     written = dst.remaining();
                 }
+                dataAsBytes.position(dataAsBytes.position()+offset);
 
-                dst.put(dataAsBytes, offset, written);
+                dataAsBytes = dataAsBytes.slice();
+                dataAsBytes.limit(written);
+                dst.put(dataAsBytes);
             }
             return written;
         }
@@ -392,20 +395,18 @@ public abstract class AbstractBDBMessage
         }
     }
 
-    byte[] getAllContent(long messageId) throws StoreException
+    ByteBuffer getAllContent(long messageId) throws StoreException
     {
         DatabaseEntry contentKeyEntry = new DatabaseEntry();
         LongBinding.longToEntry(messageId, contentKeyEntry);
         DatabaseEntry value = new DatabaseEntry();
-        ContentBinding contentTupleBinding = ContentBinding.getInstance();
+        ByteBufferBinding contentTupleBinding = ByteBufferBinding.getInstance();
 
 
         getLogger().debug("Message Id: {} Getting content body", messageId);
 
         try
         {
-
-            int written = 0;
             OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
             if (status == OperationStatus.SUCCESS)
             {
@@ -511,8 +512,8 @@ public abstract class AbstractBDBMessage
         DatabaseEntry key = new DatabaseEntry();
         LongBinding.longToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        ContentBinding messageBinding = ContentBinding.getInstance();
-        messageBinding.objectToEntry(contentBody.array(), value);
+        ByteBufferBinding messageBinding = ByteBufferBinding.getInstance();
+        messageBinding.objectToEntry(contentBody, value);
         try
         {
             OperationStatus status = getMessageContentDb().put(tx, key, value);
@@ -884,15 +885,15 @@ public abstract class AbstractBDBMessage
     static interface MessageDataRef<T extends StorableMessageMetaData>
     {
         T getMetaData();
-        byte[] getData();
-        void setData(byte[] data);
+        ByteBuffer getData();
+        void setData(ByteBuffer data);
         boolean isHardRef();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
     {
         private final T _metaData;
-        private byte[] _data;
+        private volatile ByteBuffer _data;
 
         private MessageDataHardRef(final T metaData)
         {
@@ -906,13 +907,13 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public byte[] getData()
+        public ByteBuffer getData()
         {
             return _data;
         }
 
         @Override
-        public void setData(final byte[] data)
+        public void setData(final ByteBuffer data)
         {
             _data = data;
         }
@@ -927,9 +928,9 @@ public abstract class AbstractBDBMessage
     private static final class MessageData<T extends StorableMessageMetaData>
     {
         private T _metaData;
-        private SoftReference<byte[]> _data;
+        private SoftReference<ByteBuffer> _data;
 
-        private MessageData(final T metaData, final byte[] data)
+        private MessageData(final T metaData, final ByteBuffer data)
         {
             _metaData = metaData;
 
@@ -944,12 +945,12 @@ public abstract class AbstractBDBMessage
             return _metaData;
         }
 
-        public byte[] getData()
+        public ByteBuffer getData()
         {
             return _data == null ? null : _data.get();
         }
 
-        public void setData(final byte[] data)
+        public void setData(final ByteBuffer data)
         {
             _data = new SoftReference<>(data);
         }
@@ -959,7 +960,7 @@ public abstract class AbstractBDBMessage
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
     {
 
-        public MessageDataSoftRef(final T metadata, byte[] data)
+        public MessageDataSoftRef(final T metadata, ByteBuffer data)
         {
             super(new MessageData<T>(metadata, data));
         }
@@ -972,7 +973,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public byte[] getData()
+        public ByteBuffer getData()
         {
             MessageData<T> ref = get();
 
@@ -980,7 +981,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public void setData(final byte[] data)
+        public void setData(final ByteBuffer data)
         {
             MessageData<T> ref = get();
             if(ref != null)
@@ -1046,23 +1047,19 @@ public abstract class AbstractBDBMessage
         public void addContent(ByteBuffer src)
         {
             src = src.slice();
-            byte[] data = _messageDataRef.getData();
+            ByteBuffer data = _messageDataRef.getData();
             if(data == null)
             {
-                data = new byte[src.remaining()];
-                src.duplicate().get(data);
-                _messageDataRef.setData(data);
+                _messageDataRef.setData(src);
             }
             else
             {
-                byte[] oldData = data;
-                data = new byte[oldData.length + src.remaining()];
-
-
-                System.arraycopy(oldData, 0, data, 0, oldData.length);
-                src.duplicate().get(data, oldData.length, src.remaining());
-
-                _messageDataRef.setData(data);
+                int size = data.remaining() + src.remaining();
+                ByteBuffer buf = data.isDirect() ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+                buf.put(data.duplicate());
+                buf.put(src.duplicate());
+                buf.flip();
+                _messageDataRef.setData(buf);
             }
 
         }
@@ -1076,39 +1073,17 @@ public abstract class AbstractBDBMessage
         @Override
         public int getContent(int offsetInMessage, ByteBuffer dst)
         {
-            byte[] data = _messageDataRef.getData();
-            if(data == null)
-            {
-                if(stored())
-                {
-                    checkMessageStoreOpen();
-                    data = AbstractBDBMessageStore.this.getAllContent(_messageId);
-                    T metaData = _messageDataRef.getMetaData();
-                    if (metaData == null)
-                    {
-                        metaData = (T) getMessageMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<>(metaData, data);
-                    }
-                    else
-                    {
-                        _messageDataRef.setData(data);
-                    }
-                }
-                else
-                {
-                    data = new byte[0];
-                }
-            }
-
-            int length = Math.min(dst.remaining(), data.length - offsetInMessage);
-            dst.put(data, offsetInMessage, length);
+            ByteBuffer data = getContentAsByteBuffer();
+            data = data.slice();
+            int length = Math.min(dst.remaining(), data.remaining());
+            data.limit(length);
+            dst.put(data);
             return length;
         }
 
-        @Override
-        public ByteBuffer getContent(int offsetInMessage, int size)
+        private ByteBuffer getContentAsByteBuffer()
         {
-            byte[] data = _messageDataRef.getData();
+            ByteBuffer data = _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
@@ -1119,7 +1094,7 @@ public abstract class AbstractBDBMessage
                     if (metaData == null)
                     {
                         metaData = (T) getMessageMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+                        _messageDataRef = new MessageDataSoftRef<T>(metaData, data);
                     }
                     else
                     {
@@ -1128,23 +1103,20 @@ public abstract class AbstractBDBMessage
                 }
                 else
                 {
-                    return null;
+                    data = ByteBuffer.wrap(new byte[0]);
                 }
-            }
-            try
-            {
-                return ByteBuffer.wrap(data, offsetInMessage, Math.min(size, data.length - offsetInMessage));
-            }
-            catch (IndexOutOfBoundsException e)
-            {
-                IndexOutOfBoundsException indexOutOfBoundsException =
-                        new IndexOutOfBoundsException("Error wrapping data (data.length: " + data.length
-                                                      + " offsetInMessage: " + offsetInMessage
-                                                      + " size: " + size);
-                indexOutOfBoundsException.initCause(e);
-                throw indexOutOfBoundsException;
-            }
+            } return data;
+        }
 
+        @Override
+        public ByteBuffer getContent(int offsetInMessage, int size)
+        {
+            ByteBuffer data = getContentAsByteBuffer();
+            data = data.duplicate();
+            data.position(offsetInMessage);
+            data = data.slice();
+            data.limit(size);
+            return data;
         }
 
         synchronized Runnable store(Transaction txn)
@@ -1156,7 +1128,7 @@ public abstract class AbstractBDBMessage
                 AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
                                                         _messageDataRef.getData() == null
                                                                 ? ByteBuffer.allocate(0)
-                                                                : ByteBuffer.wrap(_messageDataRef.getData()));
+                                                                : _messageDataRef.getData());
 
 
                 MessageDataRef<T> hardRef = _messageDataRef;

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java Tue Jul 28 15:48:30 2015
@@ -25,11 +25,9 @@ import com.sleepycat.bind.tuple.TupleOut
 import com.sleepycat.je.DatabaseException;
 
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class FieldTableEncoding
 {
@@ -39,6 +37,7 @@ public class FieldTableEncoding
 
     public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException
     {
+
         long length = tupleInput.readLong();
         if (length <= 0)
         {
@@ -47,17 +46,9 @@ public class FieldTableEncoding
         else
         {
 
-            byte[] data = new byte[(int)length];
-            tupleInput.readFast(data);
+            ByteBuffer buf = ByteBufferBinding.getInstance().readByteBuffer(tupleInput, (int) length);
 
-            try
-            {
-                return new FieldTable(new DataInputStream(new ByteArrayInputStream(data)),length);
-            }
-            catch (IOException e)
-            {
-                throw new StoreException(e);
-            }
+            return new FieldTable(buf);
 
         }
 

Added: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java?rev=1693123&view=auto
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java (added)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java Tue Jul 28 15:48:30 2015
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import java.nio.ByteBuffer;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class ByteBufferBinding extends TupleBinding<ByteBuffer>
+{
+    private static final int COPY_BUFFER_SIZE = 8192;
+
+    private static final ThreadLocal<byte[]> COPY_BUFFER = new ThreadLocal<byte[]>()
+                                                            {
+
+                                                                @Override
+                                                                protected byte[] initialValue()
+                                                                {
+                                                                    return new byte[COPY_BUFFER_SIZE];
+                                                                }
+                                                            };
+
+    private static final ByteBufferBinding INSTANCE = new ByteBufferBinding();
+
+    public static ByteBufferBinding getInstance()
+    {
+        return INSTANCE;
+    }
+
+    /** private constructor forces getInstance instead */
+    private ByteBufferBinding() { }
+
+    @Override
+    public ByteBuffer entryToObject(final TupleInput input)
+    {
+        int available = input.available();
+        ByteBuffer buf = ByteBuffer.allocateDirect(available);
+        byte[] copyBuf = COPY_BUFFER.get();
+        while(available > 0)
+        {
+            int read = input.read(copyBuf);
+            buf.put(copyBuf,0,read);
+            available = input.available();
+        }
+        buf.flip();
+        return buf;
+    }
+
+    @Override
+    public void objectToEntry(ByteBuffer data, final TupleOutput output)
+    {
+        data = data.duplicate();
+        byte[] copyBuf = COPY_BUFFER.get();
+        while(data.hasRemaining())
+        {
+            int length = Math.min(COPY_BUFFER_SIZE, data.remaining());
+            data.get(copyBuf,0,length);
+            output.write(copyBuf,0,length);
+        }
+    }
+
+    public ByteBuffer readByteBuffer(final TupleInput input, int length)
+    {
+        ByteBuffer buf = ByteBuffer.allocateDirect(length);
+        byte[] copyBuf = COPY_BUFFER.get();
+        while(length > 0)
+        {
+            int read = input.read(copyBuf, 0, Math.min(COPY_BUFFER_SIZE, length));
+            buf.put(copyBuf,0,read);
+            length -= read;
+        }
+        buf.flip();
+        return buf;
+    }
+}

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java Tue Jul 28 15:48:30 2015
@@ -48,6 +48,7 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ByteArrayDataInput;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MessagePublishInfo;
@@ -677,7 +678,7 @@ public class UpgradeFrom4To5 extends Abs
 
             try
             {
-                return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)),
+                return ContentHeaderBody.createFromBuffer(new ByteArrayDataInput(underlying),
                         bodySize);
             }
             catch (IOException e)

Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java (original)
+++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java Tue Jul 28 15:48:30 2015
@@ -21,11 +21,12 @@
 package org.apache.qpid.server.store.berkeleydb.upgrade;
 
 import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
 
 import com.sleepycat.bind.tuple.IntegerBinding;
 import com.sleepycat.bind.tuple.LongBinding;
@@ -127,7 +128,7 @@ public class UpgraderTest extends Abstra
 
     private void assertContent()
     {
-        final ContentBinding contentBinding = ContentBinding.getInstance();
+        final ByteBufferBinding contentBinding = ByteBufferBinding.getInstance();
         CursorOperation contentCursorOperation = new CursorOperation()
         {
 
@@ -137,8 +138,9 @@ public class UpgraderTest extends Abstra
             {
                 long id = LongBinding.entryToLong(key);
                 assertTrue("Unexpected id", id > 0);
-                byte[] content = contentBinding.entryToObject(value);
+                ByteBuffer content = contentBinding.entryToObject(value);
                 assertNotNull("Unexpected content", content);
+                assertTrue("Expected content", content.hasRemaining());
             }
         };
         new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Jul 28 15:48:30 2015
@@ -1181,7 +1181,7 @@ public abstract class AbstractJDBCMessag
     }
 
 
-    private byte[] getAllContent(long messageId)
+    private ByteBuffer getAllContent(long messageId)
     {
         Connection conn = null;
         PreparedStatement stmt = null;
@@ -1200,7 +1200,10 @@ public abstract class AbstractJDBCMessag
             {
 
                 byte[] dataAsBytes = getBlobAsBytes(rs, 1);
-                return dataAsBytes;
+                ByteBuffer buf = ByteBuffer.allocateDirect(dataAsBytes.length);
+                buf.put(dataAsBytes);
+                buf.flip();
+                return buf;
             }
 
             throw new StoreException("No such message, id: " + messageId);
@@ -1426,15 +1429,15 @@ public abstract class AbstractJDBCMessag
     static interface MessageDataRef<T extends StorableMessageMetaData>
     {
         T getMetaData();
-        byte[] getData();
-        void setData(byte[] data);
+        ByteBuffer getData();
+        void setData(ByteBuffer data);
         boolean isHardRef();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
     {
         private final T _metaData;
-        private byte[] _data;
+        private ByteBuffer _data;
 
         private MessageDataHardRef(final T metaData)
         {
@@ -1448,13 +1451,13 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public byte[] getData()
+        public ByteBuffer getData()
         {
             return _data;
         }
 
         @Override
-        public void setData(final byte[] data)
+        public void setData(final ByteBuffer data)
         {
             _data = data;
         }
@@ -1469,9 +1472,9 @@ public abstract class AbstractJDBCMessag
     private static final class MessageData<T extends StorableMessageMetaData>
     {
         private T _metaData;
-        private SoftReference<byte[]> _data;
+        private SoftReference<ByteBuffer> _data;
 
-        private MessageData(final T metaData, final byte[] data)
+        private MessageData(final T metaData, final ByteBuffer data)
         {
             _metaData = metaData;
 
@@ -1486,12 +1489,12 @@ public abstract class AbstractJDBCMessag
             return _metaData;
         }
 
-        public byte[] getData()
+        public ByteBuffer getData()
         {
             return _data == null ? null : _data.get();
         }
 
-        public void setData(final byte[] data)
+        public void setData(final ByteBuffer data)
         {
             _data = new SoftReference<>(data);
         }
@@ -1501,7 +1504,7 @@ public abstract class AbstractJDBCMessag
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
     {
 
-        public MessageDataSoftRef(final T metadata, byte[] data)
+        public MessageDataSoftRef(final T metadata, ByteBuffer data)
         {
             super(new MessageData<T>(metadata, data));
         }
@@ -1514,7 +1517,7 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public byte[] getData()
+        public ByteBuffer getData()
         {
             MessageData<T> ref = get();
 
@@ -1522,7 +1525,7 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public void setData(final byte[] data)
+        public void setData(final ByteBuffer data)
         {
             MessageData<T> ref = get();
             if(ref != null)
@@ -1598,25 +1601,20 @@ public abstract class AbstractJDBCMessag
         public void addContent(ByteBuffer src)
         {
             src = src.slice();
-            byte[] data = _messageDataRef.getData();
-
+            ByteBuffer data = _messageDataRef.getData();
             if(data == null)
             {
-                data = new byte[src.remaining()];
-                src.duplicate().get(data);
-                _messageDataRef.setData(data);
+                _messageDataRef.setData(src);
             }
             else
             {
-                byte[] oldData = data;
-                data = new byte[oldData.length + src.remaining()];
-
-                System.arraycopy(oldData,0,data,0,oldData.length);
-                src.duplicate().get(data, oldData.length, src.remaining());
-
-                _messageDataRef.setData(data);
+                int size = data.remaining() + src.remaining();
+                ByteBuffer buf = data.isDirect() ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+                buf.put(data.duplicate());
+                buf.put(src.duplicate());
+                buf.flip();
+                _messageDataRef.setData(buf);
             }
-
         }
 
         @Override
@@ -1628,8 +1626,17 @@ public abstract class AbstractJDBCMessag
         @Override
         public int getContent(int offsetInMessage, ByteBuffer dst)
         {
-            byte[] data = _messageDataRef.getData();
+            ByteBuffer data = getContentAsByteBuffer();
+            data = data.slice();
+            int length = Math.min(dst.remaining(), data.remaining());
+            data.limit(length);
+            dst.put(data);
+            return length;
+        }
 
+        private ByteBuffer getContentAsByteBuffer()
+        {
+            ByteBuffer data = _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
@@ -1656,57 +1663,23 @@ public abstract class AbstractJDBCMessag
                 }
                 else
                 {
-                    data = new byte[0];
+                    data = ByteBuffer.wrap(new byte[0]);
                 }
-            }
-
-            int length = Math.min(dst.remaining(), data.length - offsetInMessage);
-            dst.put(data, offsetInMessage, length);
-            return length;
-
+            } return data;
         }
 
-
         @Override
         public ByteBuffer getContent(int offsetInMessage, int size)
         {
-            byte[] data = _messageDataRef.getData();
-
-            if(data == null)
-            {
-
-                if(stored())
-                {
-                    checkMessageStoreOpen();
-
-                    data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
-                    T metaData = _messageDataRef.getMetaData();
-                    if (metaData == null)
-                    {
-                        try
-                        {
-                            metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                            _messageDataRef = new MessageDataSoftRef<>(metaData, data);
-                        }
-                        catch (SQLException e)
-                        {
-                            throw new StoreException("Failed to get content for message id: " + _messageId, e);
-                        }
-                    }
-                    else
-                    {
-                        _messageDataRef.setData(data);
-                    }
-                }
-                else
-                {
-                    return null;
-                }
-            }
-            return ByteBuffer.wrap(data,offsetInMessage,Math.min(size,data.length-offsetInMessage));
-
+            ByteBuffer data = getContentAsByteBuffer();
+            data = data.duplicate();
+            data.position(offsetInMessage);
+            data = data.slice();
+            data.limit(size);
+            return data;
         }
 
+
         @Override
         public void remove()
         {
@@ -1746,8 +1719,8 @@ public abstract class AbstractJDBCMessag
                 storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                                _messageDataRef.getData() == null
-                                                                       ? ByteBuffer.allocate(0)
-                                                                       : ByteBuffer.wrap(_messageDataRef.getData()));
+                                                                       ? ByteBuffer.allocateDirect(0)
+                                                                       : _messageDataRef.getData());
 
 
                 getLogger().debug("Storing message {} to store", _messageId);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Tue Jul 28 15:48:30 2015
@@ -61,7 +61,7 @@ public class StoredMemoryMessage<T exten
                         : contentSize;
                 ByteBuffer oldContent = _content;
                 oldContent.flip();
-                _content = ByteBuffer.allocate(size);
+                _content = ByteBuffer.allocateDirect(size);
                 _content.put(oldContent);
                 _content.put(src.duplicate());
             }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Tue Jul 28 15:48:30 2015
@@ -471,7 +471,10 @@ public class MultiVersionProtocolEngine
                     {
                         _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion);
                     }
-                    _sender.send(ByteBuffer.wrap(supportedReplyBytes));
+                    final ByteBuffer supportedReplyBuf = ByteBuffer.allocateDirect(supportedReplyBytes.length);
+                    supportedReplyBuf.put(supportedReplyBytes);
+                    supportedReplyBuf.flip();
+                    _sender.send(supportedReplyBuf);
                     _sender.flush();
 
                     _delegate = new ClosedDelegateProtocolEngine();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Jul 28 15:48:30 2015
@@ -83,7 +83,7 @@ public class NonBlockingConnection imple
 
         _receiveBufferSize = receiveBufferSize;
 
-        _netInputBuffer = ByteBuffer.allocate(receiveBufferSize);
+        _netInputBuffer = ByteBuffer.allocateDirect(receiveBufferSize);
         _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
         _port = port;
 
@@ -359,7 +359,7 @@ public class NonBlockingConnection imple
         else
         {
             // compact into new buffer
-            _netInputBuffer = ByteBuffer.allocate(_receiveBufferSize);
+            _netInputBuffer = ByteBuffer.allocateDirect(_receiveBufferSize);
             _netInputBuffer.put(duplicate);
         }
         return readData;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Tue Jul 28 15:48:30 2015
@@ -105,7 +105,7 @@ public class NonBlockingConnectionTLSDel
         do
         {
             ByteBuffer appInputBuffer =
-                    ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+                    ByteBuffer.allocateDirect(_sslEngine.getSession().getApplicationBufferSize() + 50);
             _status = _sslEngine.unwrap(wrappedDataBuffer, appInputBuffer);
             if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
             {
@@ -135,7 +135,7 @@ public class NonBlockingConnectionTLSDel
         {
             if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
             {
-                final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+                final ByteBuffer netBuffer = ByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize());
                 _status = _sslEngine.wrap(bufferArray, netBuffer);
                 runSSLEngineTasks(_status);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Tue Jul 28 15:48:30 2015
@@ -103,7 +103,7 @@ public class AMQPConnection_0_10 extends
         _connection.setRemoteAddress(network.getRemoteAddress());
         _connection.setLocalAddress(network.getLocalAddress());
 
-        _inputHandler = new InputHandler(new ServerAssembler(_connection));
+        _inputHandler = new InputHandler(new ServerAssembler(_connection), true);
         _network = network;
 
         Subject.doAs(getSubject(), new PrivilegedAction<Object>()

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java Tue Jul 28 15:48:30 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 
+import java.nio.ByteBuffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,5 +55,11 @@ public class ServerAssembler extends Ass
         }
     }
 
+    @Override
+    protected ByteBuffer allocateByteBuffer(int size)
+    {
+        return ByteBuffer.allocateDirect(size);
+    }
+
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java Tue Jul 28 15:48:30 2015
@@ -29,6 +29,9 @@ import static org.apache.qpid.transport.
 
 import java.nio.ByteBuffer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.Header;
@@ -48,6 +51,7 @@ import org.apache.qpid.transport.network
  */
 public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerDisassembler.class);
     private final ByteBufferSender _sender;
     private int _maxPayload;
     private final Object _sendLock = new Object();
@@ -89,7 +93,7 @@ public final class ServerDisassembler im
 
     private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
     {
-        ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]);
+        ByteBuffer data = ByteBuffer.allocateDirect(HEADER_SIZE);
 
         data.put(0, flags);
         data.put(1, type);
@@ -141,7 +145,7 @@ public final class ServerDisassembler im
 
     public void init(Void v, ProtocolHeader header)
     {
-        _sender.send(header.toByteBuffer());
+        _sender.send(header.toByteBuffer(true));
         _sender.flush();
 }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java Tue Jul 28 15:48:30 2015
@@ -44,7 +44,7 @@ public final class ServerEncoder extends
     {
         _initialCapacity = capacity;
         _threshold = capacity/16;
-        _out = ByteBuffer.allocate(capacity);
+        _out = ByteBuffer.allocateDirect(capacity);
         _segment = 0;
     }
 
@@ -55,7 +55,7 @@ public final class ServerEncoder extends
         _out = _out.slice();
         if(_out.remaining() < _threshold)
         {
-            _out = ByteBuffer.allocate(_initialCapacity);
+            _out = ByteBuffer.allocateDirect(_initialCapacity);
         }
         _segment = 0;
     }
@@ -84,7 +84,7 @@ public final class ServerEncoder extends
     {
         ByteBuffer old = _out;
         int capacity = old.capacity();
-        _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+        _out = ByteBuffer.allocateDirect(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
         old.flip();
         _out.put(old);
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Jul 28 15:48:30 2015
@@ -451,7 +451,7 @@ public class AMQChannel
                     for(int i = 0 ; i < bodyCount ; i++)
                     {
                         ContentBody contentChunk = _currentMessage.getContentChunk(i);
-                        handle.addContent(ByteBuffer.wrap(contentChunk.getPayload()));
+                        handle.addContent(contentChunk.getPayload().duplicate());
                         bodyLengthReceived += contentChunk.getSize();
                     }
                 }
@@ -2507,7 +2507,7 @@ public class AMQChannel
     }
 
     @Override
-    public void receiveMessageContent(final byte[] data)
+    public void receiveMessageContent(final ByteBuffer data)
     {
         if(_logger.isDebugEnabled())
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Tue Jul 28 15:48:30 2015
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.AMQFrameD
 import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ByteBufferDataInput;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.EncodingUtils;
 import org.apache.qpid.framing.FieldTable;
@@ -158,15 +159,14 @@ public class MessageMetaData implements
         {
             try
             {
-                ByteBufferInputStream bbis = new ByteBufferInputStream(buf);
-                DataInputStream dais = new DataInputStream(bbis);
-                int size = EncodingUtils.readInteger(dais);
-                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size);
-                final AMQShortString exchange = EncodingUtils.readAMQShortString(dais);
-                final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais);
+                ByteBufferDataInput dataInput = new ByteBufferDataInput(buf);
+                int size = EncodingUtils.readInteger(dataInput);
+                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dataInput, size);
+                final AMQShortString exchange = EncodingUtils.readAMQShortString(dataInput);
+                final AMQShortString routingKey = EncodingUtils.readAMQShortString(dataInput);
 
-                final byte flags = EncodingUtils.readByte(dais);
-                long arrivalTime = EncodingUtils.readLong(dais);
+                final byte flags = EncodingUtils.readByte(dataInput);
+                long arrivalTime = EncodingUtils.readLong(dataInput);
 
                 MessagePublishInfo publishBody =
                         new MessagePublishInfo(exchange,

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Tue Jul 28 15:48:30 2015
@@ -48,7 +48,6 @@ import org.apache.qpid.util.GZIPUtils;
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
     private static final int BASIC_CLASS_ID = 60;
-
     private final AMQPConnection_0_8 _connection;
     private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Tue Jul 28 15:48:30 2015
@@ -61,7 +61,17 @@ public class SymbolTypeConstructor exten
             size = in.getInt();
         }
 
-        BinaryString binaryStr = new BinaryString(in.array(), in.arrayOffset()+in.position(), size);
+        BinaryString binaryStr;
+        if (in.hasArray())
+        {
+            binaryStr = new BinaryString(in.array(), in.arrayOffset()+in.position(), size);
+        }
+        else
+        {
+            byte[] b = new byte[in.remaining()];
+            in.duplicate().get(b);
+            binaryStr = new BinaryString(b, 0, b.length);
+        }
 
         Symbol symbolVal = SYMBOL_MAP.get(binaryStr);
         if(symbolVal == null)

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Tue Jul 28 15:48:30 2015
@@ -282,7 +282,16 @@ class WebSocketProvider implements Accep
         {
             try
             {
-                _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining());
+                if (msg.hasArray())
+                {
+                    _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
+                }
+                else
+                {
+                    byte[] copy = new byte[msg.remaining()];
+                    msg.duplicate().get(copy);
+                    _connection.sendMessage(copy, 0, copy.length);
+                }
             }
             catch (IOException e)
             {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Tue Jul 28 15:48:30 2015
@@ -392,9 +392,7 @@ public class BasicMessageProducer_0_8 ex
 
         if (frames.length == (offset + 1))
         {
-            byte[] data = new byte[payload.remaining()];
-            payload.get(data);
-            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
+            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
         }
         else
         {
@@ -406,10 +404,8 @@ public class BasicMessageProducer_0_8 ex
                 payload.position((int) framePayloadMax * (i - offset));
                 int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
-                byte[] data = new byte[payload.remaining()];
-                payload.get(data);
 
-                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
+                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
 
                 remaining -= length;
             }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Tue Jul 28 15:48:30 2015
@@ -76,7 +76,7 @@ public abstract class AbstractJMSMessage
                     _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
                 }
 
-                data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload());
+                data = ((ContentBody) bodies.get(0)).getPayload().duplicate();
             }
             else if (bodies != null)
             {
@@ -91,7 +91,7 @@ public abstract class AbstractJMSMessage
                 while (it.hasNext())
                 {
                     ContentBody cb = (ContentBody) it.next();
-                    final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload());
+                    final ByteBuffer payload = cb.getPayload().duplicate();
                     if (payload.isDirect() || payload.isReadOnly())
                     {
                         data.put(payload);
@@ -193,13 +193,11 @@ public abstract class AbstractJMSMessage
     private class BodyInputStream extends InputStream
     {
         private final Iterator<ContentBody> _bodiesIter;
-        private byte[] _currentBuffer;
-        private int _currentPos;
+        private ByteBuffer _currentBuffer;
         public BodyInputStream(final List<ContentBody> bodies)
         {
             _bodiesIter = bodies.iterator();
-            _currentBuffer = _bodiesIter.next().getPayload();
-            _currentPos = 0;
+            _currentBuffer = _bodiesIter.next().getPayload().duplicate();
         }
 
         @Override
@@ -220,7 +218,7 @@ public abstract class AbstractJMSMessage
         @Override
         public int read(final byte[] dst, final int off, final int len)
         {
-            while(_currentPos == _currentBuffer.length)
+            while(!_currentBuffer.hasRemaining())
             {
                 if(!_bodiesIter.hasNext())
                 {
@@ -228,13 +226,11 @@ public abstract class AbstractJMSMessage
                 }
                 else
                 {
-                    _currentBuffer = _bodiesIter.next().getPayload();
-                    _currentPos = 0;
+                    _currentBuffer = _bodiesIter.next().getPayload().duplicate();
                 }
             }
-            int size = Math.min(len, _currentBuffer.length - _currentPos);
-            System.arraycopy(_currentBuffer,_currentPos, dst,off,size);
-            _currentPos+=size;
+            int size = Math.min(len, _currentBuffer.remaining());
+            _currentBuffer.get(dst,off,size);
             return size;
         }
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java Tue Jul 28 15:48:30 2015
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.QpidException;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ByteArrayDataInput;
 
 public class Encrypted091MessageFactory extends AbstractJMSMessageFactory
 {
@@ -143,11 +144,10 @@ public class Encrypted091MessageFactory
 
                 BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
                 int payloadOffset;
-                try (ByteArrayInputStream bis = new ByteArrayInputStream(unencryptedBytes);
-                     DataInputStream dis = new DataInputStream(bis))
-                {
-                    payloadOffset = properties.read(dis);
-                }
+                ByteArrayDataInput dataInput = new ByteArrayDataInput(unencryptedBytes);
+
+                payloadOffset = properties.read(dataInput);
+
 
                 final ByteBuffer unencryptedData =
                         ByteBuffer.wrap(unencryptedBytes, payloadOffset, unencryptedBytes.length - payloadOffset);

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Tue Jul 28 15:48:30 2015
@@ -89,7 +89,7 @@ public class UnprocessedMessage_0_8 exte
 
         if (body.getPayload() != null)
         {
-            final long payloadSize = body.getPayload().length;
+            final long payloadSize = body.getPayload().remaining();
 
             if (_bodies == null)
             {

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Tue Jul 28 15:48:30 2015
@@ -63,6 +63,8 @@ public abstract class AMQDecoder<T exten
 
     private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
 
+    private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
+
     /**
      * Creates a new AMQP decoder.
      *
@@ -98,144 +100,13 @@ public abstract class AMQDecoder<T exten
         return _methodProcessor;
     }
 
-    private class RemainingByteArrayInputStream extends InputStream
-    {
-        private int _currentListPos;
-        private int _markPos;
-
-
-        @Override
-        public int read() throws IOException
-        {
-            ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
-            if(currentStream.available() > 0)
-            {
-                return currentStream.read();
-            }
-            else if((_currentListPos == _remainingBufs.size())
-                    || (++_currentListPos == _remainingBufs.size()))
-            {
-                return -1;
-            }
-            else
-            {
-
-                ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
-                stream.mark(0);
-                return stream.read();
-            }
-        }
-
-        @Override
-        public int read(final byte[] b, final int off, final int len) throws IOException
-        {
-
-            if(_currentListPos == _remainingBufs.size())
-            {
-                return -1;
-            }
-            else
-            {
-                ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
-                final int available = currentStream.available();
-                int read = currentStream.read(b, off, len > available ? available : len);
-                if(read < len)
-                {
-                    if(_currentListPos++ != _remainingBufs.size())
-                    {
-                        _remainingBufs.get(_currentListPos).mark(0);
-                    }
-                    int correctRead = read == -1 ? 0 : read;
-                    int subRead = read(b, off+correctRead, len-correctRead);
-                    if(subRead == -1)
-                    {
-                        return read;
-                    }
-                    else
-                    {
-                        return correctRead+subRead;
-                    }
-                }
-                else
-                {
-                    return len;
-                }
-            }
-        }
-
-        @Override
-        public int available() throws IOException
-        {
-            int total = 0;
-            for(int i = _currentListPos; i < _remainingBufs.size(); i++)
-            {
-                total += _remainingBufs.get(i).available();
-            }
-            return total;
-        }
-
-        @Override
-        public void mark(final int readlimit)
-        {
-            _markPos = _currentListPos;
-            final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
-            if(stream != null)
-            {
-                stream.mark(readlimit);
-            }
-        }
-
-        @Override
-        public void reset() throws IOException
-        {
-            _currentListPos = _markPos;
-            final int size = _remainingBufs.size();
-            if(_currentListPos < size)
-            {
-                _remainingBufs.get(_currentListPos).reset();
-            }
-            for(int i = _currentListPos+1; i<size; i++)
-            {
-                _remainingBufs.get(i).reset();
-            }
-        }
-    }
-
-    private static class SimpleDataInputStream extends DataInputStream implements MarkableDataInput
-    {
-        public SimpleDataInputStream(InputStream in)
-        {
-            super(in);
-        }
-
-        public AMQShortString readAMQShortString() throws IOException
-        {
-            return EncodingUtils.readAMQShortString(this);
-        }
-
-    }
-
 
     public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
 
-        MarkableDataInput msg;
-
-
-        // get prior remaining data from accumulator
-        ByteArrayInputStream bais;
-        DataInput di;
-        if(!_remainingBufs.isEmpty())
-        {
-             bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
-            _remainingBufs.add(bais);
-            msg = new SimpleDataInputStream(new RemainingByteArrayInputStream());
-        }
-        else
-        {
-            bais = null;
-            msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
-        }
+        buf = buf.slice();
+        _incompleteBuffers.add(buf);
+        ByteBufferListDataInput msg = new ByteBufferListDataInput(_incompleteBuffers);
 
         // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate
         // an unsupported version
@@ -268,60 +139,28 @@ public abstract class AMQDecoder<T exten
                 }
 
             }
+        }
 
-            if(!enoughData)
+        ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
+        while(iter.hasNext())
+        {
+            ByteBuffer next = iter.next();
+            if(next.hasRemaining())
             {
-                if(!_remainingBufs.isEmpty())
+                if(next.position() != 0)
                 {
-                    _remainingBufs.remove(_remainingBufs.size()-1);
-                    ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
-                    while(iterator.hasNext() && iterator.next().available() == 0)
-                    {
-                        iterator.remove();
-                    }
-                }
-
-                if(bais == null)
-                {
-                    if(msg.available()!=0)
-                    {
-                        byte[] remaining = new byte[msg.available()];
-                        msg.read(remaining);
-                        _remainingBufs.add(new ByteArrayInputStream(remaining));
-                    }
-                }
-                else
-                {
-                    if(bais.available()!=0)
-                    {
-                        byte[] remaining = new byte[bais.available()];
-                        bais.read(remaining);
-                        _remainingBufs.add(new ByteArrayInputStream(remaining));
-                    }
-                }
-
-                if(_remainingBufs.size() > MAX_BUFFERS_LIMIT)
-                {
-                    int totalSize = 0;
-                    for(ByteArrayInputStream stream : _remainingBufs)
-                    {
-                        totalSize += stream.available();
-                    }
-
-                    byte[] completeBuffer = new byte[totalSize];
-                    int pos = 0;
-                    for(ByteArrayInputStream stream : _remainingBufs)
-                    {
-                        pos += stream.read(completeBuffer, pos, stream.available());
-                    }
-
-                    _remainingBufs.clear();
-                    _remainingBufs.add(new ByteArrayInputStream(completeBuffer));
+                    iter.set(next.slice());
                 }
+                break;
+            }
+            else
+            {
+                iter.remove();
             }
         }
     }
 
+
     private boolean decodable(final MarkableDataInput in) throws AMQFrameDecodingException, IOException
     {
         final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Tue Jul 28 15:48:30 2015
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortS
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public interface MarkableDataInput extends DataInput
 {
@@ -36,6 +37,8 @@ public interface MarkableDataInput exten
 
     int read(byte[] b) throws IOException;
 
+    ByteBuffer readAsByteBuffer(int len) throws IOException;
+
     public AMQShortString readAMQShortString() throws IOException;
 
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Tue Jul 28 15:48:30 2015
@@ -62,21 +62,27 @@ public class AMQFrame extends AMQDataBlo
     }
 
     private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
+    private static final ByteBuffer FRAME_END_BYTE_BUFFER = ByteBuffer.allocateDirect(1);
+    static
+    {
+        FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE);
+        FRAME_END_BYTE_BUFFER.flip();
+    }
 
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        byte[] frameHeader = new byte[7];
-        BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+        ByteBuffer frameHeader = ByteBuffer.allocateDirect(7);
 
-        buffer.writeByte(_bodyFrame.getFrameType());
-        EncodingUtils.writeUnsignedShort(buffer, _channel);
-        EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
-        sender.send(ByteBuffer.wrap(frameHeader));
+        frameHeader.put(_bodyFrame.getFrameType());
+        EncodingUtils.writeUnsignedShort(frameHeader, _channel);
+        EncodingUtils.writeUnsignedInteger(frameHeader, _bodyFrame.getSize());
+        frameHeader.flip();
+        sender.send(frameHeader);
 
         long size = 8 + _bodyFrame.writePayload(sender);
 
-        sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+        sender.send(FRAME_END_BYTE_BUFFER.duplicate());
         return size;
     }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Tue Jul 28 15:48:30 2015
@@ -26,16 +26,21 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.QpidException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferDataOutput;
 import org.apache.qpid.util.BytesDataOutput;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AMQMethodBodyImpl.class);
     public static final byte TYPE = 1;
 
     public AMQMethodBodyImpl()
@@ -111,11 +116,13 @@ public abstract class AMQMethodBodyImpl
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
+
         final int size = getSize();
-        byte[] bytes = new byte[size];
-        BytesDataOutput buffer = new BytesDataOutput(bytes);
-        writePayload(buffer);
-        sender.send(ByteBuffer.wrap(bytes));
+        ByteBuffer buf = ByteBuffer.allocateDirect(size);
+        ByteBufferDataOutput dataOutput = new ByteBufferDataOutput(buf);
+        writePayload(dataOutput);
+        buf.flip();
+        sender.send(buf);
         return size;
     }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue Jul 28 15:48:30 2015
@@ -24,6 +24,7 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentHashMap;
@@ -115,6 +116,33 @@ public final class AMQShortString implem
 
     }
 
+    public static AMQShortString readAMQShortString(ByteBuffer buffer)
+    {
+        int length = ((int) buffer.get()) & 0xff;
+        if(length == 0)
+        {
+            return null;
+        }
+        else
+        {
+            if (length > MAX_LENGTH)
+            {
+                throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+            }
+            if(length > buffer.remaining())
+            {
+                throw new IllegalArgumentException("Cannot create AMQShortString with length "
+                                                   + length + " from a ByteBuffer with only "
+                                                   + buffer.remaining()
+                                                   + " bytes.");
+
+            }
+            byte[] data = new byte[length];
+            buffer.get(data);
+            return new AMQShortString(data, 0, length);
+        }
+    }
+
     public AMQShortString(byte[] data, final int offset, final int length)
     {
         if (length > MAX_LENGTH)

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java Tue Jul 28 15:48:30 2015
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Collection;
 
+import org.apache.qpid.codec.MarkableDataInput;
+
 /**
  * AMQType is a type that represents the different possible AMQP field table types. It provides operations for each
  * of the types to perform tasks such as calculating the size of an instance of the type, converting types between AMQP
@@ -57,7 +58,7 @@ public enum AMQType
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -103,7 +104,7 @@ public enum AMQType
             EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readUnsignedInteger(buffer);
         }
@@ -142,7 +143,7 @@ public enum AMQType
             EncodingUtils.writeInteger(buffer, unscaled);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             byte places = EncodingUtils.readByte(buffer);
 
@@ -179,7 +180,7 @@ public enum AMQType
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -259,7 +260,7 @@ public enum AMQType
          *
          * @return An instance of the type.
          */
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             try
             {
@@ -322,7 +323,7 @@ public enum AMQType
                  *
                  * @return An instance of the type.
                  */
-                public Object readValueFromBuffer(DataInput buffer) throws IOException
+                public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
                 {
                     // Read size of field table then all name/value pairs.
                     return FieldArray.readFromBuffer(buffer);
@@ -352,7 +353,7 @@ public enum AMQType
         public void writeValueImpl(Object value, DataOutput buffer)
         { }
 
-        public Object readValueFromBuffer(DataInput buffer)
+        public Object readValueFromBuffer(MarkableDataInput buffer)
         {
             return null;
         }
@@ -383,7 +384,7 @@ public enum AMQType
             EncodingUtils.writeLongstr(buffer, (byte[]) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readLongstr(buffer);
         }
@@ -413,7 +414,7 @@ public enum AMQType
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -444,7 +445,7 @@ public enum AMQType
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -479,7 +480,7 @@ public enum AMQType
             EncodingUtils.writeBoolean(buffer, (Boolean) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readBoolean(buffer);
         }
@@ -514,7 +515,7 @@ public enum AMQType
             EncodingUtils.writeChar(buffer, (Character) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readChar(buffer);
         }
@@ -549,7 +550,7 @@ public enum AMQType
             EncodingUtils.writeByte(buffer, (Byte) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readByte(buffer);
         }
@@ -588,7 +589,7 @@ public enum AMQType
             EncodingUtils.writeShort(buffer, (Short) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readShort(buffer);
         }
@@ -630,7 +631,7 @@ public enum AMQType
             EncodingUtils.writeInteger(buffer, (Integer) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readInteger(buffer);
         }
@@ -677,7 +678,7 @@ public enum AMQType
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -712,7 +713,7 @@ public enum AMQType
             EncodingUtils.writeFloat(buffer, (Float) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readFloat(buffer);
         }
@@ -751,7 +752,7 @@ public enum AMQType
             EncodingUtils.writeDouble(buffer, (Double) value);
         }
 
-        public Object readValueFromBuffer(DataInput buffer) throws IOException
+        public Object readValueFromBuffer(MarkableDataInput buffer) throws IOException
         {
             return EncodingUtils.readDouble(buffer);
         }
@@ -840,5 +841,5 @@ public enum AMQType
      *
      * @return An instance of the type.
      */
-    abstract Object readValueFromBuffer(DataInput buffer) throws IOException;
+    abstract Object readValueFromBuffer(MarkableDataInput buffer) throws IOException;
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Tue Jul 28 15:48:30 2015
@@ -28,6 +28,8 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.qpid.codec.MarkableDataInput;
+
 /**
  * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
  * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
@@ -65,7 +67,7 @@ public abstract class AMQTypedValue
             _value = type.toNativeValue(value);
         }
 
-        private GenericTypedValue(AMQType type, DataInput buffer) throws IOException
+        private GenericTypedValue(AMQType type, MarkableDataInput buffer) throws IOException
         {
             _type = type;
             _value = type.readValueFromBuffer(buffer);
@@ -198,7 +200,7 @@ public abstract class AMQTypedValue
     }
 
 
-    public static AMQTypedValue readFromBuffer(DataInput buffer) throws IOException
+    public static AMQTypedValue readFromBuffer(MarkableDataInput buffer) throws IOException
     {
         AMQType type = AMQTypeMap.getType(buffer.readByte());
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message