qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lqu...@apache.org
Subject [3/5] qpid-broker-j git commit: QPID-7753: Periodically compact direct memory buffers.
Date Fri, 28 Apr 2017 13:51:10 GMT
QPID-7753: Periodically compact direct memory buffers.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/36bab8f3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/36bab8f3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/36bab8f3

Branch: refs/heads/master
Commit: 36bab8f33034b567ea04b472da548b41059c44c9
Parents: 3eba6a0
Author: Lorenz Quack <lquack@apache.org>
Authored: Tue Apr 25 16:55:14 2017 +0100
Committer: Lorenz Quack <lquack@apache.org>
Committed: Fri Apr 28 14:50:33 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     |  29 ++++
 .../qpid/server/bytebuffer/BufferPool.java      |  17 ++-
 .../qpid/server/bytebuffer/ByteBufferRef.java   |   2 +
 .../bytebuffer/NonPooledByteBufferRef.java      |   6 +
 .../server/bytebuffer/PooledByteBufferRef.java  |  25 +++-
 .../qpid/server/bytebuffer/QpidByteBuffer.java  |  63 +++++++++
 .../message/internal/InternalMessage.java       |   6 +
 .../internal/InternalMessageMetaData.java       |   6 +
 .../org/apache/qpid/server/model/Broker.java    |  49 ++++++-
 .../apache/qpid/server/model/BrokerImpl.java    | 135 ++++++++++++++++++-
 .../org/apache/qpid/server/model/Queue.java     |   3 +-
 .../apache/qpid/server/queue/AbstractQueue.java |  39 +++++-
 .../server/store/StorableMessageMetaData.java   |   1 +
 .../qpid/server/store/StoredMemoryMessage.java  |  18 ++-
 .../apache/qpid/server/store/StoredMessage.java |   2 +
 .../server/virtualhost/AbstractVirtualHost.java |  33 +++++
 .../virtualhost/QueueManagingVirtualHost.java   |   7 +-
 .../server/queue/AbstractQueueTestBase.java     |   5 +-
 .../qpid/server/store/TestMessageMetaData.java  |   6 +
 .../MessageConverter_Internal_to_v0_10.java     |   8 +-
 .../protocol/v0_10/MessageConverter_v0_10.java  |   8 +-
 .../protocol/v0_10/MessageMetaData_0_10.java    |   8 +-
 .../protocol/v0_10/ServerSessionDelegate.java   |   6 +-
 .../v0_8/MessageConverter_Internal_to_v0_8.java |   6 +
 .../server/protocol/v0_8/MessageMetaData.java   |   6 +
 .../transport/BasicContentHeaderProperties.java |   5 +
 .../v0_8/transport/ContentHeaderBody.java       |   5 +
 .../protocol/v1_0/MessageConverter_to_1_0.java  |   6 +
 .../protocol/v1_0/MessageMetaData_1_0.java      |  29 ++++
 .../v1_0/type/messaging/AbstractSection.java    |   7 +
 .../type/messaging/codec/EncodingRetaining.java |   1 +
 .../MessageConverter_1_0_to_v0_10.java          |   6 +
 .../MessageConverter_0_10_to_0_8.java           |   6 +
 .../MessageConverter_0_8_to_0_10.java           |   6 +
 .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java |   6 +
 .../store/jdbc/AbstractJDBCMessageStore.java    |  31 +++++
 36 files changed, 576 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 0a02dcd..732d291 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -901,6 +901,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        void reallocate(final long smallestAllowedBufferId);
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -936,6 +937,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             return true;
         }
+
+        @Override
+        public void reallocate(final long smallestAllowedBufferId)
+        {
+            if(_metaData != null)
+            {
+                _metaData.reallocate(smallestAllowedBufferId);
+            }
+            _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+        }
     }
 
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -990,6 +1001,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             return false;
         }
+
+        @Override
+        public void reallocate(final long smallestAllowedBufferId)
+        {
+            if(_metaData != null)
+            {
+                _metaData.reallocate(smallestAllowedBufferId);
+            }
+            _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+        }
     }
 
     final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
@@ -1250,6 +1271,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             return this.getClass() + "[messageId=" + _messageId + "]";
         }
 
+        @Override
+        public synchronized void reallocate(final long smallestAllowedBufferId)
+        {
+            if(_messageDataRef != null)
+            {
+                _messageDataRef.reallocate(smallestAllowedBufferId);
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/BufferPool.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/BufferPool.java
index fdbef8f..80baa57 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/BufferPool.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/BufferPool.java
@@ -22,11 +22,13 @@ package org.apache.qpid.server.bytebuffer;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 class BufferPool
 {
     private final int _maxSize;
     private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new ConcurrentLinkedQueue<>();
+    private final AtomicInteger _size = new AtomicInteger();
 
     BufferPool(final int maxSize)
     {
@@ -35,15 +37,21 @@ class BufferPool
 
     ByteBuffer getBuffer()
     {
-        return _pooledBuffers.poll();
+        final ByteBuffer buffer = _pooledBuffers.poll();
+        if (buffer != null)
+        {
+            _size.decrementAndGet();
+        }
+        return buffer;
     }
 
     void returnBuffer(ByteBuffer buf)
     {
         buf.clear();
-        if (_pooledBuffers.size() < _maxSize)
+        if (size() < _maxSize)
         {
             _pooledBuffers.add(buf);
+            _size.incrementAndGet();
         }
     }
 
@@ -51,4 +59,9 @@ class BufferPool
     {
         return _maxSize;
     }
+
+    public int size()
+    {
+        return _size.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
index 1b99b52..d5fadc7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
@@ -31,4 +31,6 @@ public interface ByteBufferRef
     ByteBuffer getBuffer();
 
     void removeFromPool();
+
+    long getPooledBufferId();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
index 05da3f8..03a6284 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
@@ -54,4 +54,10 @@ class NonPooledByteBufferRef implements ByteBufferRef
     {
 
     }
+
+    @Override
+    public long getPooledBufferId()
+    {
+        return Long.MAX_VALUE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
index 455f19e..cb7a4f8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
@@ -21,24 +21,30 @@
 package org.apache.qpid.server.bytebuffer;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 
 class PooledByteBufferRef implements ByteBufferRef
 {
     private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
-
+    private static final AtomicLong BUFFER_ID = new AtomicLong(Long.MIN_VALUE);
+    private static final AtomicInteger ACTIVE_BUFFERS = new AtomicInteger();
     private final ByteBuffer _buffer;
+    private final long _id;
+    @SuppressWarnings("unused")
     private volatile int _refCount;
 
     PooledByteBufferRef(final ByteBuffer buffer)
     {
         _buffer = buffer;
+        _id = BUFFER_ID.getAndIncrement();
+        ACTIVE_BUFFERS.incrementAndGet();
     }
 
     @Override
     public void incrementRef()
     {
-
         if(REF_COUNT.get(this) >= 0)
         {
             REF_COUNT.incrementAndGet(this);
@@ -51,6 +57,7 @@ class PooledByteBufferRef implements ByteBufferRef
         if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
         {
             QpidByteBuffer.returnToPool(_buffer);
+            ACTIVE_BUFFERS.decrementAndGet();
         }
     }
 
@@ -66,5 +73,19 @@ class PooledByteBufferRef implements ByteBufferRef
         REF_COUNT.set(this, Integer.MIN_VALUE/2);
     }
 
+    @Override
+    public long getPooledBufferId()
+    {
+        return _id;
+    }
+
+    public static int getActiveBufferCount()
+    {
+        return ACTIVE_BUFFERS.get();
+    }
 
+    public static long getLargestBufferId()
+    {
+        return BUFFER_ID.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index a74e300..fe5db24 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -53,6 +53,7 @@ public class QpidByteBuffer
             "_disposed");
     private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
     private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
+    private static final double REALLOCATION_CAPACITY_THRESHOLD_FRACTION = 0.9;
     private volatile static boolean _isPoolInitialized;
     private volatile static BufferPool _bufferPool;
     private volatile static int _pooledBufferSize;
@@ -541,6 +542,11 @@ public class QpidByteBuffer
         return this;
     }
 
+    private long getPooledBufferId()
+    {
+        return _ref.getPooledBufferId();
+    }
+
     ByteBuffer getUnderlyingBuffer()
     {
         return _buffer;
@@ -821,6 +827,63 @@ public class QpidByteBuffer
         return _pooledBufferSize;
     }
 
+    public static int getAllocatedDirectMemorySize()
+    {
+        return _pooledBufferSize * getNumberOfActivePooledBuffers();
+    }
+
+    public static int getNumberOfActivePooledBuffers()
+    {
+        return PooledByteBufferRef.getActiveBufferCount();
+    }
+
+    public static int getNumberOfPooledBuffers()
+    {
+        return _bufferPool.size();
+    }
+
+    public static long getLargestPooledBufferId()
+    {
+        return PooledByteBufferRef.getLargestBufferId();
+    }
+
+    public static List<QpidByteBuffer> reallocateIfNecessary(final long smallestAllowedBufferId, Collection<QpidByteBuffer> data)
+    {
+        if (data != null)
+        {
+            List<QpidByteBuffer> newCopy = new ArrayList<>(data.size());
+            for (QpidByteBuffer buf : data)
+            {
+                newCopy.add(reallocateIfNecessary(smallestAllowedBufferId, buf));
+            }
+            return newCopy;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public static QpidByteBuffer reallocateIfNecessary(final long smallestAllowedBufferId, final QpidByteBuffer data)
+    {
+        double capacityThreshold = QpidByteBuffer.getPooledBufferSize() * REALLOCATION_CAPACITY_THRESHOLD_FRACTION;
+        if (data != null
+            && data.isDirect()
+            && data.getPooledBufferId() < smallestAllowedBufferId
+            && data.remaining() < capacityThreshold)
+        {
+            QpidByteBuffer newBuf = allocateDirect(data.remaining());
+            newBuf.put(data);
+            newBuf.flip();
+            data.dispose();
+            return newBuf;
+        }
+        else
+        {
+            return data;
+        }
+    }
+
     private static final class BufferInputStream extends InputStream
     {
         private final QpidByteBuffer _qpidByteBuffer;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 72e3eb3..2f583e1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -257,6 +257,12 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
                     {
                         return false;
                     }
+
+                    @Override
+                    public void reallocate(final long smallestAllowedBufferId)
+                    {
+
+                    }
                 };
             }
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
index bf7a60a..7e22ef9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
@@ -104,6 +104,12 @@ public class InternalMessageMetaData implements StorableMessageMetaData
 
     }
 
+    @Override
+    public void reallocate(final long smallestAllowedBufferId)
+    {
+
+    }
+
     static InternalMessageMetaData create(boolean persistent, final InternalMessageHeader header, int contentSize)
     {
         return new InternalMessageMetaData(persistent, header, contentSize);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 44fdab9..86a8d09 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -83,7 +83,27 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     String DEFAULT_HTTP_PORT_NUMBER = "8080";
 
     @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
-    long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double) BrokerImpl.getMaxDirectMemorySize());
+    long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.75 * (double) BrokerImpl.getMaxDirectMemorySize());
+
+    String COMPACT_MEMORY_THRESHOLD = "qpid.compact_memory_threshold";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = COMPACT_MEMORY_THRESHOLD)
+    long DEFAULT_COMPACT_MEMORY_THRESHOLD = (long)(0.5 * (double) BrokerImpl.getMaxDirectMemorySize());
+
+    String COMPACT_MEMORY_INTERVAL = "qpid.compact_memory_interval";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = COMPACT_MEMORY_INTERVAL)
+    long DEFAULT_COMPACT_MEMORY_INTERVAL = 1000L;
+
+    String MEMORY_OCCUPANCY_THRESHOLD = "qpid.memory_occupancy_threshold";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = MEMORY_OCCUPANCY_THRESHOLD)
+    double DEFAULT_MEMORY_OCCUPANCY_THRESHOLD = 0.5;
+
+    String MEMORY_COMPACTION_INCREMENT = "qpid.memory_compaction_increment";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = MEMORY_COMPACTION_INCREMENT)
+    long DEFAULT_MEMORY_COMPACTION_INCREMENT = 100;
 
     @ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)
     long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l;
@@ -250,6 +270,18 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
                       description = "Number of objects pending finalization")
     int getNumberOfObjectsPendingFinalization();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
+            units = StatisticUnit.COUNT,
+            label = "Number of Active Pooled Buffers",
+            description = "Number of pooled buffers in use.")
+    long getNumberOfActivePooledBuffers();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
+            units = StatisticUnit.COUNT,
+            label = "Number of Pooled Buffers",
+            description = "Number of pooled buffers.")
+    long getNumberOfPooledBuffers();
+
     @ManagedOperation(nonModifying = true,
             description = "Restart the broker within the same JVM",
             changesConfiguredObjectState = false,
@@ -339,4 +371,19 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
 
     ScheduledFuture<?> scheduleTask(long delay, final TimeUnit unit, Runnable task);
 
+    @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable. See also " + MEMORY_OCCUPANCY_THRESHOLD)
+    long getCompactMemoryThreshold();
+
+    @DerivedAttribute(description = "Time interval (in milliseconds) between runs of the memory compactor check. See also " + COMPACT_MEMORY_THRESHOLD)
+    long getCompactMemoryInterval();
+
+    @DerivedAttribute(description = "Occupancy threshold (fraction) at which point buffers will be compacted. See also " + COMPACT_MEMORY_THRESHOLD)
+    double getMemoryOccupancyThreshold();
+
+    @DerivedAttribute(description = "Approximate number of buffers that will be compacted on each compaction run. See also " + COMPACT_MEMORY_THRESHOLD)
+    long getMemoryCompactionIncrement();
+
+    @ManagedOperation(changesConfiguredObjectState = false, nonModifying = true,
+            description = "Force direct memory buffer compaction.")
+    void compactMemory();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 67f5063..0a28e2b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -45,14 +45,15 @@ import java.util.regex.Pattern;
 import javax.security.auth.Subject;
 import javax.security.auth.login.AccountNotFoundException;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.BrokerPrincipal;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.CommonProperties;
-import org.apache.qpid.server.BrokerPrincipal;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
@@ -85,9 +86,9 @@ import org.apache.qpid.server.store.preferences.PreferenceStoreUpdaterImpl;
 import org.apache.qpid.server.store.preferences.PreferencesRecoverer;
 import org.apache.qpid.server.store.preferences.PreferencesRoot;
 import org.apache.qpid.server.util.HousekeepingExecutor;
+import org.apache.qpid.server.util.SystemUtils;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator;
-import org.apache.qpid.server.util.SystemUtils;
 
 @ManagedObject( category = false, type = "Broker" )
 public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<BrokerImpl>
@@ -146,6 +147,11 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private final AccessControl _accessControl;
     private TaskExecutor _preferenceTaskExecutor;
     private String _documentationUrl;
+    private volatile long _smallestAllowedBufferId = QpidByteBuffer.getLargestPooledBufferId();
+    private long _compactMemoryThreshold;
+    private long _compactMemoryInterval;
+    private double _memoryOccupancyThreshold;
+    private long _memoryCompactionIncrement;
 
     @ManagedObjectFactoryConstructor
     public BrokerImpl(Map<String, Object> attributes,
@@ -412,6 +418,11 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
                                                              getHousekeepingThreadCount(),
                                                              getSystemTaskSubject("Housekeeping", _principal));
 
+        _houseKeepingTaskExecutor.scheduleWithFixedDelay(this::checkDirectMemoryUsage,
+                                                         _compactMemoryInterval,
+                                                         _compactMemoryInterval,
+                                                         TimeUnit.MILLISECONDS);
+
         final PreferenceStoreUpdaterImpl updater = new PreferenceStoreUpdaterImpl();
         final Collection<PreferenceRecord> preferenceRecords = _preferenceStore.openAndLoad(updater);
         _preferenceTaskExecutor = new TaskExecutorImpl("broker-" + getName() + "-preferences", null);
@@ -427,6 +438,14 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         setState(State.ACTIVE);
     }
 
+    private void checkDirectMemoryUsage()
+    {
+        if (_compactMemoryThreshold >= 0 && getUsedDirectMemorySize() > _compactMemoryThreshold)
+        {
+            compactMemory();
+        }
+    }
+
     private void initialiseStatisticsReporting()
     {
         long report = getStatisticsReportingPeriod() * 1000L; // convert to ms
@@ -575,6 +594,11 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         long heapMemory = Runtime.getRuntime().maxMemory();
         getEventLogger().message(BrokerMessages.MAX_MEMORY(heapMemory, directMemory));
 
+        _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
+        _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
+        _memoryOccupancyThreshold = getContextValue(Double.class, Broker.MEMORY_OCCUPANCY_THRESHOLD);
+        _memoryCompactionIncrement = getContextValue(Long.class, Broker.MEMORY_COMPACTION_INCREMENT);
+
         if (SystemUtils.getProcessPid() != null)
         {
             getEventLogger().message(BrokerMessages.PROCESS(SystemUtils.getProcessPid()));
@@ -829,6 +853,18 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         _dataReceived.registerEvent(messageSize, timestamp);
     }
 
+    @Override
+    public long getNumberOfActivePooledBuffers()
+    {
+        return QpidByteBuffer.getNumberOfActivePooledBuffers();
+    }
+
+    @Override
+    public long getNumberOfPooledBuffers()
+    {
+        return QpidByteBuffer.getNumberOfPooledBuffers();
+    }
+
     public StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;
@@ -1135,6 +1171,101 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         }
     }
 
+    @Override
+    public long getCompactMemoryThreshold()
+    {
+        return _compactMemoryThreshold;
+    }
+
+    @Override
+    public long getCompactMemoryInterval()
+    {
+        return _compactMemoryInterval;
+    }
+
+    @Override
+    public double getMemoryOccupancyThreshold()
+    {
+        return _memoryOccupancyThreshold;
+    }
+
+    @Override
+    public long getMemoryCompactionIncrement()
+    {
+        return _memoryCompactionIncrement;
+    }
+
+    @Override
+    public void compactMemory()
+    {
+        long memOccupiedByMessages = getMemOccupiedByMessages();
+        double ratio = memOccupiedByMessages / (double) QpidByteBuffer.getAllocatedDirectMemorySize();
+
+        if (ratio < _memoryOccupancyThreshold)
+        {
+            int numberOfActivePooledBuffers = QpidByteBuffer.getNumberOfActivePooledBuffers();
+            LOGGER.debug("Compacting direct memory buffers: "
+                         + "memOccupiedByMessages: {}, numberOfActivePooledBuffers: {}, ratio: {}",
+                         memOccupiedByMessages, numberOfActivePooledBuffers, ratio);
+
+            long largestBufferId = QpidByteBuffer.getLargestPooledBufferId();
+            _smallestAllowedBufferId = Math.min(largestBufferId,
+                                                _smallestAllowedBufferId + _memoryCompactionIncrement);
+
+            final Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+            List<ListenableFuture<Void>> futures = new ArrayList<>(vhns.size());
+            for (VirtualHostNode<?> vhn : vhns)
+            {
+                VirtualHost<?> vh = vhn.getVirtualHost();
+                if (vh instanceof QueueManagingVirtualHost)
+                {
+                    ListenableFuture<Void> future = ((QueueManagingVirtualHost) vh).reallocateMessages(
+                            _smallestAllowedBufferId);
+                    futures.add(future);
+                }
+            }
+            final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(futures);
+            addFutureCallback(combinedFuture, new FutureCallback<List<Void>>()
+            {
+                @Override
+                public void onSuccess(final List<Void> result)
+                {
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        long memOccupiedByMessages = getMemOccupiedByMessages();
+                        double ratio = memOccupiedByMessages / (double) QpidByteBuffer.getAllocatedDirectMemorySize();
+                        LOGGER.debug("After compact direct memory buffers: numberOfActivePooledBuffers: {}, ratio: {}",
+                                     QpidByteBuffer.getNumberOfActivePooledBuffers(),
+                                     ratio);
+                    }
+                }
+
+                @Override
+                public void onFailure(final Throwable t)
+                {
+                    LOGGER.warn("Unexpected error during direct memory compaction.", t);
+                }
+            }, _houseKeepingTaskExecutor);
+        }
+    }
+
+    private long getMemOccupiedByMessages()
+    {
+        final Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+
+        long memOccupiedByMessages = 0;
+        for (VirtualHostNode<?> vhn : vhns)
+        {
+            VirtualHost<?> vh = vhn.getVirtualHost();
+            if (vh instanceof QueueManagingVirtualHost)
+            {
+                QueueManagingVirtualHost<?> host = (QueueManagingVirtualHost<?>) vh;
+                memOccupiedByMessages += host.getTotalDepthOfQueuesBytesIncludingHeader();
+            }
+        }
+        return memOccupiedByMessages;
+    }
+
     private class AddressSpaceRegistry implements SystemAddressSpaceCreator.AddressSpaceRegistry
     {
         private final ConcurrentMap<String, NamedAddressSpace> _systemAddressSpaces = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 84d8f3a..be8a11f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -471,6 +471,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
      */
     void checkMessageStatus();
 
+    void reallocateMessages(long smallestAllowedBufferId);
+
     Set<NotificationCheck> getNotificationChecks();
 
     Collection<String> getAvailableAttributes();
@@ -490,5 +492,4 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
     void deleteEntry(QueueEntry entry);
 
     QueueEntry getLeastSignificantOldestEntry();
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c7d53d8..1fa8aec 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -18,8 +18,8 @@
  */
 package org.apache.qpid.server.queue;
 
-import static org.apache.qpid.server.util.ParameterizedTypes.MAP_OF_STRING_STRING;
 import static org.apache.qpid.server.util.GZIPUtils.GZIP_CONTENT_ENCODING;
+import static org.apache.qpid.server.util.ParameterizedTypes.MAP_OF_STRING_STRING;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -66,9 +66,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBufferInputStream;
-import org.apache.qpid.server.filter.SelectorParsingException;
-import org.apache.qpid.server.filter.selector.ParseException;
-import org.apache.qpid.server.filter.selector.TokenMgrError;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.connection.SessionPrincipal;
@@ -77,6 +74,9 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.SelectorParsingException;
+import org.apache.qpid.server.filter.selector.ParseException;
+import org.apache.qpid.server.filter.selector.TokenMgrError;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -2102,6 +2102,37 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     }
 
+    @Override
+    public void reallocateMessages(final long smallestAllowedBufferId)
+    {
+        QueueEntryIterator queueListIterator = getEntries().iterator();
+
+        while (!_stopped.get() && queueListIterator.advance())
+        {
+            final QueueEntry node = queueListIterator.getNode();
+            if (!node.isDeleted() && !node.expired())
+            {
+                try
+                {
+                    final ServerMessage message = node.getMessage();
+                    final MessageReference messageReference = message.newReference();
+                    try
+                    {
+                        message.getStoredMessage().reallocate(smallestAllowedBufferId);
+                    }
+                    finally
+                    {
+                        messageReference.release();
+                    }
+                }
+                catch (MessageDeletedException mde)
+                {
+                    // Ignore
+                }
+            }
+        }
+    }
+
     private boolean consumerHasAvailableMessages(final QueueConsumer consumer)
     {
         final QueueEntry queueEntry;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
index 10a4f7f..83747fd 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
@@ -39,5 +39,6 @@ public interface StorableMessageMetaData
 
     void clearEncodedForm();
 
+    void reallocate(final long smallestAllowedBufferId);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index e9b19f8..a2a7837 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -30,8 +30,8 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
 {
     private final long _messageNumber;
     private final int _contentSize;
-    private QpidByteBuffer _content;
-    private T _metaData;
+    private volatile QpidByteBuffer _content;
+    private volatile T _metaData;
 
     public StoredMemoryMessage(long messageNumber, T metaData)
     {
@@ -45,7 +45,7 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         return _messageNumber;
     }
 
-    public void addContent(QpidByteBuffer src)
+    public synchronized void addContent(QpidByteBuffer src)
     {
         if(_content == null)
         {
@@ -76,7 +76,7 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
     }
 
     @Override
-    public StoredMessage<T> allContentAdded()
+    public synchronized StoredMessage<T> allContentAdded()
     {
         if(_content != null)
         {
@@ -87,7 +87,7 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
 
 
     @Override
-    public Collection<QpidByteBuffer> getContent(int offset, int length)
+    public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
     {
         if(_content == null)
         {
@@ -107,7 +107,7 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         return _metaData;
     }
 
-    public void remove()
+    public synchronized void remove()
     {
         _metaData.dispose();
         _metaData = null;
@@ -130,4 +130,10 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         return false;
     }
 
+    @Override
+    public synchronized void reallocate(final long smallestAllowedBufferId)
+    {
+        _metaData.reallocate(smallestAllowedBufferId);
+        _content = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _content);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 8ab62d2..1a23edd 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -39,4 +39,6 @@ public interface StoredMessage<M extends StorableMessageMetaData>
     boolean isInMemory();
 
     boolean flowToDisk();
+
+    void reallocate(final long smallestAllowedBufferId);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0ec6815..e54e70c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -54,6 +54,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -68,6 +69,7 @@ import com.google.common.base.Function;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
@@ -1400,6 +1402,37 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
+    public ListenableFuture<Void> reallocateMessages(final long smallestAllowedBufferId)
+    {
+        final Future future = _houseKeepingTaskExecutor.submit(() ->
+                                                                  {
+                                                                      final Collection<Queue> queues =
+                                                                              getChildren(Queue.class);
+                                                                      for (Queue q : queues)
+                                                                      {
+                                                                          if (q.getState() == State.ACTIVE)
+                                                                          {
+                                                                              q.reallocateMessages(
+                                                                                      smallestAllowedBufferId);
+                                                                          }
+                                                                      }
+                                                                  });
+        return JdkFutureAdapters.listenInPoolThread(future);
+    }
+
+    @Override
+    public long getTotalDepthOfQueuesBytesIncludingHeader()
+    {
+        long total = 0;
+        final Collection<Queue> queues = getChildren(Queue.class);
+        for(Queue q : queues)
+        {
+            total += q.getQueueDepthBytesIncludingHeader();
+        }
+        return total;
+    }
+
+    @Override
     public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
                                                                          final String address)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 1cb33a2..8b30d2e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -29,13 +29,14 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Content;
-import org.apache.qpid.server.model.DoOnConfigThread;
 import org.apache.qpid.server.model.ManageableMessage;
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
@@ -202,6 +203,8 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Outbound")
     long getMessagesOut();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Depth of Queues Including Header")
+    long getTotalDepthOfQueuesBytesIncludingHeader();
 
     @Override
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
@@ -280,6 +283,8 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
 
     MessageDestination getSystemDestination(String name);
 
+    ListenableFuture<Void> reallocateMessages(long smallestAllowedBufferId);
+
     interface Transaction
     {
         void dequeue(QueueEntry entry);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 6082da0..55b3365 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -66,6 +66,7 @@ import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -1156,10 +1157,12 @@ abstract class AbstractQueueTestBase extends QpidTestCase
         when(message.getMessageNumber()).thenReturn(id);
         when(message.getMessageHeader()).thenReturn(header);
 
+        StoredMessage storedMessage = mock(StoredMessage.class);
+        when(message.getStoredMessage()).thenReturn(storedMessage);
+
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
 
-
         when(message.newReference()).thenReturn(ref);
         when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
index 64d5797..964f459 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
@@ -85,6 +85,12 @@ public class TestMessageMetaData implements StorableMessageMetaData
     }
 
     @Override
+    public void reallocate(final long smallestAllowedBufferId)
+    {
+
+    }
+
+    @Override
     public void writeToBuffer(QpidByteBuffer dest)
     {
         dest.putLong(_messageId);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 6f47eaf..8e5867b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -120,7 +120,13 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
                     {
                         return false;
                     }
-        };
+
+                    @Override
+                    public void reallocate(final long smallestAllowedBufferId)
+                    {
+
+                    }
+                };
     }
 
     private String improveMimeType(final InternalMessage serverMsg, String mimeType)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index dfc4950..695ea03 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -108,7 +108,13 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
                     {
                         return false;
                     }
-        };
+
+                    @Override
+                    public void reallocate(final long smallestAllowedBufferId)
+                    {
+
+                    }
+                };
     }
 
     private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 8dc9a6b..d756173 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -175,7 +175,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
 
     public boolean isPersistent()
     {
-        return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT;
+        return _deliveryProps != null && _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT;
     }
 
     @Override
@@ -194,6 +194,12 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
         }
     }
 
+    @Override
+    public synchronized void reallocate(final long smallestAllowedBufferId)
+    {
+        _encoded = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encoded);
+    }
+
     public String getRoutingKey()
     {
         return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 5b28676..c34740e 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -36,11 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
-import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.consumer.ConsumerOption;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -60,6 +59,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NoFactoryForTypeException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.protocol.v0_10.transport.*;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.store.MessageHandle;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 30eb4ec..c9faf8e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -121,6 +121,12 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
             {
                 return false;
             }
+
+            @Override
+            public void reallocate(final long smallestAllowedBufferId)
+            {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index dbbcf34..ce17beb 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -140,6 +140,12 @@ public class MessageMetaData implements StorableMessageMetaData
         _contentHeaderBody.clearEncodedForm();
     }
 
+    @Override
+    public synchronized void reallocate(final long smallestAllowedBufferId)
+    {
+        _contentHeaderBody.reallocate(smallestAllowedBufferId);
+    }
+
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index dc948aa..d01ce6b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -968,4 +968,9 @@ public class BasicContentHeaderProperties
             _encodedForm = null;
         }
     }
+
+    synchronized void reallocate(final long smallestAllowedBufferId)
+    {
+        _encodedForm = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encodedForm);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
index 4c660cf..08f5d4e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
@@ -194,4 +194,9 @@ public class ContentHeaderBody implements AMQBody
     {
         _properties.clearEncodedForm();
     }
+
+    public void reallocate(final long smallestAllowedBufferId)
+    {
+        _properties.reallocate(smallestAllowedBufferId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index c6bc02d..6fd1417 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -271,6 +271,12 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
             return false;
         }
 
+        @Override
+        public void reallocate(final long smallestAllowedBufferId)
+        {
+
+        }
+
         private void dispose()
         {
             _section.dispose();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index e98c769..f84eeb8 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -306,6 +306,35 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
     }
 
     @Override
+    public void reallocate(final long smallestAllowedBufferId)
+    {
+        if (_headerSection != null)
+        {
+            _headerSection.reallocate(smallestAllowedBufferId);
+        }
+        if (_deliveryAnnotationsSection != null)
+        {
+            _deliveryAnnotationsSection.reallocate(smallestAllowedBufferId);
+        }
+        if (_messageAnnotationsSection != null)
+        {
+            _messageAnnotationsSection.reallocate(smallestAllowedBufferId);
+        }
+        if (_propertiesSection != null)
+        {
+            _propertiesSection.reallocate(smallestAllowedBufferId);
+        }
+        if (_applicationPropertiesSection != null)
+        {
+            _applicationPropertiesSection.reallocate(smallestAllowedBufferId);
+        }
+        if (_footerSection != null)
+        {
+            _footerSection.reallocate(smallestAllowedBufferId);
+        }
+    }
+
+    @Override
     public void clearEncodedForm()
     {
         dispose();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
index b4aeb72..36a2b7e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
@@ -58,6 +58,13 @@ public abstract class AbstractSection<T> implements EncodingRetainingSection<T>
 
     }
 
+
+    @Override
+    public final void reallocate(final long smallestAllowedBufferId)
+    {
+        _encodedForm = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encodedForm);
+    }
+
     @Override
     public final long getEncodedSize()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
index 1a8a02a..23c67b2 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
@@ -29,6 +29,7 @@ public interface EncodingRetaining
     void setEncodedForm(List<QpidByteBuffer> encodedForm);
     List<QpidByteBuffer> getEncodedForm();
     void dispose();
+    void reallocate(final long smallestAllowedBufferId);
     long getEncodedSize();
     void writeTo(QpidByteBuffer dest);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index c4182b7..b58b821 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -128,6 +128,12 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
             {
                 return false;
             }
+
+            @Override
+            public void reallocate(final long smallestAllowedBufferId)
+            {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 3256b85..1097013 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -230,6 +230,12 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
             {
                 return false;
             }
+
+            @Override
+            public void reallocate(final long smallestAllowedBufferId)
+            {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index fcf6fb5..203a9d9 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -116,6 +116,12 @@ public class MessageConverter_0_8_to_0_10  implements MessageConverter<AMQMessag
             {
                 return false;
             }
+
+            @Override
+            public void reallocate(final long smallestAllowedBufferId)
+            {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 164b329..25fe5de 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -127,6 +127,12 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
             {
                 return false;
             }
+
+            @Override
+            public void reallocate(final long smallestAllowedBufferId)
+            {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/36bab8f3/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 846c21c..ad4acbc 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1329,6 +1329,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        void reallocate(final long smallestAllowedBufferId);
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -1364,6 +1365,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             return true;
         }
+
+        @Override
+        public void reallocate(final long smallestAllowedBufferId)
+        {
+            if(_metaData != null)
+            {
+                _metaData.reallocate(smallestAllowedBufferId);
+            }
+            _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+        }
     }
 
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -1418,6 +1429,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             return false;
         }
+
+        @Override
+        public void reallocate(final long smallestAllowedBufferId)
+        {
+            if(_metaData != null)
+            {
+                _metaData.reallocate(smallestAllowedBufferId);
+            }
+
+            _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+        }
     }
 
     private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
@@ -1686,6 +1708,15 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         @Override
+        public synchronized void reallocate(final long smallestAllowedBufferId)
+        {
+            if(_messageDataRef != null)
+            {
+                _messageDataRef.reallocate(smallestAllowedBufferId);
+            }
+        }
+
+        @Override
         public String toString()
         {
             return this.getClass() + "[messageId=" + _messageId + "]";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message