Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0FFD8200C67 for ; Mon, 15 May 2017 18:13:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0E496160BC1; Mon, 15 May 2017 16:13:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BF333160BC2 for ; Mon, 15 May 2017 18:13:49 +0200 (CEST) Received: (qmail 2672 invoked by uid 500); 15 May 2017 16:13:49 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 2656 invoked by uid 99); 15 May 2017 16:13:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 May 2017 16:13:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1746DFFB3; Mon, 15 May 2017 16:13:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lquack@apache.org To: commits@qpid.apache.org Date: Mon, 15 May 2017 16:13:49 -0000 Message-Id: In-Reply-To: <9c1ee646c2ce4953af3e73214015c981@git.apache.org> References: <9c1ee646c2ce4953af3e73214015c981@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] qpid-broker-j git commit: QPID-7775: [Java Broker] Trigger flow to disk based on actual memory occupancy. archived-at: Mon, 15 May 2017 16:13:53 -0000 QPID-7775: [Java Broker] Trigger flow to disk based on actual memory occupancy. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/501aa891 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/501aa891 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/501aa891 Branch: refs/heads/master Commit: 501aa8919c8cd4938b6fa26eef9e6746e267175b Parents: ba0e9c3 Author: Lorenz Quack Authored: Fri May 12 16:21:14 2017 +0100 Committer: Lorenz Quack Committed: Mon May 15 17:13:27 2017 +0100 ---------------------------------------------------------------------- .../berkeleydb/AbstractBDBMessageStore.java | 41 +++++- .../store/berkeleydb/BDBConfigurationStore.java | 1 + .../store/berkeleydb/BDBMessageStore.java | 1 + .../server/logging/messages/QueueMessages.java | 121 ----------------- .../messages/Queue_logmessages.properties | 6 +- .../message/AbstractServerMessageImpl.java | 9 +- .../qpid/server/message/MessageReference.java | 7 +- .../org/apache/qpid/server/model/Broker.java | 6 + .../apache/qpid/server/model/BrokerImpl.java | 18 +++ .../org/apache/qpid/server/model/Queue.java | 5 +- .../apache/qpid/server/queue/AbstractQueue.java | 113 ++-------------- .../qpid/server/store/MemoryMessageStore.java | 26 +++- .../apache/qpid/server/store/MessageStore.java | 4 + .../qpid/server/store/NullMessageStore.java | 12 ++ .../server/virtualhost/AbstractVirtualHost.java | 134 ++++++++++++------- .../virtualhost/QueueManagingVirtualHost.java | 8 ++ .../server/store/TestMessageMetaDataType.java | 6 + .../store/jdbc/AbstractJDBCMessageStore.java | 34 ++++- 18 files changed, 264 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index ce25a8f..6f1326e 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.ListenableFuture; import com.sleepycat.bind.tuple.LongBinding; @@ -100,6 +101,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore private boolean _limitBusted; private long _totalStoreSize; private final Random _lockConflictRandom = new Random(); + private final AtomicLong _inMemorySize = new AtomicLong(); + private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong(); @Override public void upgradeStoreStructure() throws StoreException @@ -147,6 +150,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override + public long getInMemorySize() + { + return _inMemorySize.get(); + } + + @Override + public long getBytesEvacuatedFromMemory() + { + return _bytesEvacuatedFromMemory.get(); + } + + @Override public boolean isPersistent() { return true; @@ -167,6 +182,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override + public void closeMessageStore() + { + _inMemorySize.set(0); + _bytesEvacuatedFromMemory.set(0); + } + + @Override public MessageStoreReader newMessageStoreReader() { return new BDBMessageStoreReader(); @@ -979,10 +1001,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore _data = data; } - public void clear() + public long clear() { + long bytesCleared = 0; if(_metaData != null) { + bytesCleared += _metaData.getStorableSize(); _metaData.clearEncodedForm(); _metaData = null; } @@ -990,10 +1014,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore { for(QpidByteBuffer buf : _data) { + bytesCleared += buf.remaining(); buf.dispose(); } + _data = null; } - _data = null; + return bytesCleared; } @Override @@ -1037,7 +1063,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { _messageDataRef = new MessageDataSoftRef<>(metaData, null); } + _contentSize = metaData.getContentSize(); + _inMemorySize.addAndGet(metaData.getStorableSize()); } @Override @@ -1089,6 +1117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override public StoredMessage allContentAdded() { + _inMemorySize.addAndGet(getContentSize()); return this; } @@ -1225,14 +1254,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore } final T metaData; + long bytesCleared = 0; if ((metaData =_messageDataRef.getMetaData()) != null) { + bytesCleared += metaData.getStorableSize(); metaData.dispose(); } Collection data = _messageDataRef.getData(); if(data != null) { + bytesCleared += getContentSize(); _messageDataRef.setData(null); for(QpidByteBuffer buf : data) { @@ -1240,6 +1272,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } _messageDataRef = null; + _inMemorySize.addAndGet(-bytesCleared); } @Override @@ -1260,7 +1293,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore flushToStore(); if(_messageDataRef != null && !_messageDataRef.isHardRef()) { - ((MessageDataSoftRef)_messageDataRef).clear(); + final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear(); + _inMemorySize.addAndGet(-bytesCleared); + _bytesEvacuatedFromMemory.addAndGet(bytesCleared); } return true; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java index 2e854f7..ca7b4dd 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -592,6 +592,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi @Override public void closeMessageStore() { + super.closeMessageStore(); _messageStoreOpen.set(false); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index a5ebc68..83df5aa 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -83,6 +83,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore @Override public void closeMessageStore() { + super.closeMessageStore(); if (_messageStoreOpen.compareAndSet(true, false)) { if (_environmentFacade != null) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java index e73e173..5df5b21 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java +++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java @@ -67,10 +67,8 @@ public class QueueMessages public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped"; public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull"; public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation"; - public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active"; public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull"; public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted"; - public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive"; static { @@ -79,10 +77,8 @@ public class QueueMessages LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY); LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY); LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY); - LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY); LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY); LoggerFactory.getLogger(DELETED_LOG_HIERARCHY); - LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY); _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale); } @@ -384,64 +380,6 @@ public class QueueMessages /** * Log a Queue message of the Format: - *
QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
- * Optional values are contained in [square brackets] and are numbered - * sequentially in the method call. - * - */ - public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2, Number param3, Number param4) - { - String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE"); - - final Object[] messageArguments = {param1, param2, param3, param4}; - // Create a new MessageFormat to ensure thread safety. - // Sharing a MessageFormat and using applyPattern is not thread safe - MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); - - final String message = formatter.format(messageArguments); - - return new LogMessage() - { - public String toString() - { - return message; - } - - public String getLogHierarchy() - { - return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY; - } - - @Override - public boolean equals(final Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - final LogMessage that = (LogMessage) o; - - return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString()); - - } - - @Override - public int hashCode() - { - int result = toString().hashCode(); - result = 31 * result + getLogHierarchy().hashCode(); - return result; - } - }; - } - - /** - * Log a Queue message of the Format: *
QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
* Optional values are contained in [square brackets] and are numbered * sequentially in the method call. @@ -556,65 +494,6 @@ public class QueueMessages }; } - /** - * Log a Queue message of the Format: - *
QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
- * Optional values are contained in [square brackets] and are numbered - * sequentially in the method call. - * - */ - public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2, Number param3, Number param4) - { - String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE"); - - final Object[] messageArguments = {param1, param2, param3, param4}; - // Create a new MessageFormat to ensure thread safety. - // Sharing a MessageFormat and using applyPattern is not thread safe - MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); - - final String message = formatter.format(messageArguments); - - return new LogMessage() - { - public String toString() - { - return message; - } - - public String getLogHierarchy() - { - return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY; - } - - @Override - public boolean equals(final Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - final LogMessage that = (LogMessage) o; - - return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString()); - - } - - @Override - public int hashCode() - { - int result = toString().hashCode(); - result = 31 * result + getLogHierarchy().hashCode(); - return result; - } - }; - } - - private QueueMessages() { } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties index 9061f0d..cd4ecdb 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties +++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties @@ -27,9 +27,9 @@ OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number} DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages -# use similar number to the broker for similar topic -FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB -FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB +# These are no longer in use +#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB +#FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB # 0 - operation name OPERATION = QUE-1016 : Operation : {0} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index b60624a..978abc5 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -213,11 +213,11 @@ public abstract class AbstractServerMessageImpl message) + private Reference(final AbstractServerMessageImpl message) throws MessageDeletedException { this(message, null); } - private Reference(final AbstractServerMessageImpl message, TransactionLogResource resource) + private Reference(final AbstractServerMessageImpl message, TransactionLogResource resource) throws MessageDeletedException { _message = message; if(resource != null) @@ -299,6 +299,11 @@ public abstract class AbstractServerMessageImpl +public interface MessageReference extends AutoCloseable { - public M getMessage(); - public void release(); + M getMessage(); + void release(); + void close(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 19e6de0..044f035 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -163,6 +163,9 @@ public interface Broker> extends ConfiguredObject, EventL @ManagedContextDefault( name = "broker.housekeepingThreadCount") public static final int DEFAULT_HOUSEKEEPING_THREAD_COUNT = 2; + String QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD = "qpid.broker.housekeepingCheckPeriod"; + @ManagedContextDefault(name = QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD) + long DEFAULT_BROKER_HOUSEKEEPING_CHECK_PERIOD = 30000L; @ManagedAttribute( defaultValue = "${broker.housekeepingThreadCount}") int getHousekeepingThreadCount(); @@ -377,6 +380,9 @@ public interface Broker> extends ConfiguredObject, EventL @DerivedAttribute(description = "Minimum fraction of direct memory buffer that can be occupied before the buffer is considered for compaction") double getSparsityFraction(); + @DerivedAttribute() + long getHousekeepingCheckPeriod(); + @ManagedOperation(changesConfiguredObjectState = false, nonModifying = true, description = "Force direct memory buffer compaction.") void compactMemory(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java index 0840b66..b9242dc 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java @@ -40,6 +40,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -152,6 +153,8 @@ public class BrokerImpl extends AbstractContainer implements Broker< private long _flowToDiskThreshold; private double _sparsityFraction; private long _lastDisposalCounter; + private ScheduledFuture _assignTargetSizeSchedulingFuture; + private long _housekeepingCheckPeriod; @ManagedObjectFactoryConstructor public BrokerImpl(Map attributes, @@ -420,6 +423,9 @@ public class BrokerImpl extends AbstractContainer implements Broker< getSystemTaskSubject("Housekeeping", _principal)); scheduleDirectMemoryCheck(); + _assignTargetSizeSchedulingFuture = scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), + TimeUnit.MILLISECONDS, + this::assignTargetSizes); final PreferenceStoreUpdaterImpl updater = new PreferenceStoreUpdaterImpl(); final Collection preferenceRecords = _preferenceStore.openAndLoad(updater); @@ -624,6 +630,7 @@ public class BrokerImpl extends AbstractContainer implements Broker< _flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD); _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD); _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL); + _housekeepingCheckPeriod = getContextValue(Long.class, Broker.QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD); if (SystemUtils.getProcessPid() != null) { @@ -732,6 +739,11 @@ public class BrokerImpl extends AbstractContainer implements Broker< _reportingTimer.cancel(); } + if (_assignTargetSizeSchedulingFuture != null) + { + _assignTargetSizeSchedulingFuture.cancel(true); + } + shutdownHouseKeeping(); stopPreferenceTaskExecutor(); @@ -1222,6 +1234,12 @@ public class BrokerImpl extends AbstractContainer implements Broker< } @Override + public long getHousekeepingCheckPeriod() + { + return _housekeepingCheckPeriod; + } + + @Override public void compactMemory() { compactMemoryInternal(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index f2cbc25..7e154ed 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.NotificationCheck; import org.apache.qpid.server.queue.QueueConsumer; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryIterator; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageEnqueueRecord; @@ -471,8 +472,6 @@ public interface Queue> extends ConfiguredObject, void recover(ServerMessage message, MessageEnqueueRecord enqueueRecord); - void setTargetSize(long targetSize); - boolean isHeld(QueueEntry queueEntry, final long evaluationTime); void checkCapacity(); @@ -480,4 +479,6 @@ public interface Queue> extends ConfiguredObject, void deleteEntry(QueueEntry entry); QueueEntry getLeastSignificantOldestEntry(); + + QueueEntryIterator queueEntryIterator(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 6deda67..31b77e7 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -142,7 +142,6 @@ public abstract class AbstractQueue> } }; - private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l; private static final String UTF8 = StandardCharsets.UTF_8.name(); private static final Operation PUBLISH_ACTION = Operation.ACTION("publish"); @@ -156,8 +155,6 @@ public abstract class AbstractQueue> private volatile QueueConsumer _exclusiveSubscriber; - private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE); - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); private final QueueStatistics _queueStatistics = new QueueStatistics(); @@ -210,7 +207,6 @@ public abstract class AbstractQueue> @ManagedAttributeField private boolean _noLocal; - private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker(); private final CopyOnWriteArrayList _bindings = new CopyOnWriteArrayList<>(); private Map _arguments; @@ -1101,9 +1097,13 @@ public abstract class AbstractQueue> doEnqueue(message, action, enqueueRecord); } - long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader(); - _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize, - _targetQueueSize.get()); + final StoredMessage storedMessage = message.getStoredMessage(); + if ((_virtualHost.isOverTargetSize() + || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold) + && storedMessage.isInMemory()) + { + storedMessage.flowToDisk(); + } } public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord) @@ -1229,15 +1229,6 @@ public abstract class AbstractQueue> // Simple Queues don't :-) } - @Override - public void setTargetSize(final long targetSize) - { - if (_targetQueueSize.compareAndSet(_targetQueueSize.get(), targetSize)) - { - _logger.debug("Queue '{}' target size : {}", getName(), targetSize); - } - } - public long getTotalDequeuedMessages() { return _queueStatistics.getDequeueCount(); @@ -1460,6 +1451,12 @@ public abstract class AbstractQueue> } + @Override + public QueueEntryIterator queueEntryIterator() + { + return getEntries().iterator(); + } + public int compareTo(final X o) { return getName().compareTo(o.getName()); @@ -2032,9 +2029,6 @@ public abstract class AbstractQueue> { QueueEntryIterator queueListIterator = getEntries().iterator(); - final long estimatedQueueSize = _queueStatistics.getQueueSizeIncludingHeader(); - _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get()); - final Set perMessageChecks = new HashSet<>(); final Set queueLevelChecks = new HashSet<>(); @@ -2053,7 +2047,6 @@ public abstract class AbstractQueue> final long currentTime = System.currentTimeMillis(); final long thresholdTime = currentTime - getAlertRepeatGap(); - long cumulativeQueueSize = 0; while (!_stopped.get() && queueListIterator.advance()) { final QueueEntry node = queueListIterator.getNode(); @@ -2075,14 +2068,8 @@ public abstract class AbstractQueue> ServerMessage msg = node.getMessage(); if (msg != null) { - MessageReference messageReference = null; - try + try (MessageReference messageReference = msg.newReference()) { - messageReference = msg.newReference(); - cumulativeQueueSize += msg.getSizeIncludingHeader(); - _flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), cumulativeQueueSize, - _targetQueueSize.get()); - for(NotificationCheck check : perMessageChecks) { checkForNotification(msg, listener, currentTime, thresholdTime, check); @@ -2092,13 +2079,6 @@ public abstract class AbstractQueue> { // Ignore } - finally - { - if (messageReference != null) - { - messageReference.release(); - } - } } } } @@ -3329,71 +3309,6 @@ public abstract class AbstractQueue> } } - private class FlowToDiskChecker - { - final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false); - - void flowToDiskIfNecessary(StoredMessage storedMessage, long estimatedQueueSize, final long targetQueueSize) - { - if ((estimatedQueueSize > targetQueueSize - || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold) - && storedMessage.isInMemory()) - { - storedMessage.flowToDisk(); - } - } - - void flowToDiskAndReportIfNecessary(StoredMessage storedMessage, - final long estimatedQueueSize, - final long targetQueueSize) - { - flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize); - reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize); - } - - void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize) - { - final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize(); - if (estimatedQueueSize > targetQueueSize - || allocatedDirectMemorySize > _flowToDiskThreshold) - { - reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold); - } - else - { - reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold); - } - } - - private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize, - long targetQueueSize, - long allocatedDirectMemorySize, - long flowToDiskThreshold) - { - if (!_lastReportedFlowToDiskStatus.getAndSet(true)) - { - getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0, - targetQueueSize / 1024.0, - allocatedDirectMemorySize / 1024.0 / 1024.0, - flowToDiskThreshold / 1024.0 / 1024.0)); - } - } - - private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize, - long targetQueueSize, - long allocatedDirectMemorySize, - long flowToDiskThreshold) - { - if (_lastReportedFlowToDiskStatus.getAndSet(false)) - { - getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0, - targetQueueSize / 1024.0, - allocatedDirectMemorySize / 1024.0 / 1024.0, - flowToDiskThreshold / 1024.0 / 1024.0)); - } - } - } - private class AdvanceConsumersTask extends HouseKeepingTask { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index b2fa18a..22020c0 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -52,6 +52,7 @@ public class MemoryMessageStore implements MessageStore private final Object _transactionLock = new Object(); private final Map> _messageInstances = new HashMap>(); private final Map _distributedTransactions = new HashMap(); + private final AtomicLong _inMemorySize = new AtomicLong(); private final class MemoryMessageStoreTransaction implements Transaction @@ -289,13 +290,23 @@ public class MemoryMessageStore implements MessageStore { @Override + public synchronized StoredMessage allContentAdded() + { + final StoredMessage storedMessage = super.allContentAdded(); + _inMemorySize.addAndGet(getContentSize()); + return storedMessage; + } + + @Override public void remove() { _messages.remove(getMessageNumber()); + int bytesCleared = metaData.getStorableSize() + metaData.getContentSize(); super.remove(); + _inMemorySize.addAndGet(-bytesCleared); } - }; + _inMemorySize.addAndGet(metaData.getStorableSize()); return storedMemoryMessage; @@ -314,6 +325,18 @@ public class MemoryMessageStore implements MessageStore } @Override + public long getInMemorySize() + { + return _inMemorySize.get(); + } + + @Override + public long getBytesEvacuatedFromMemory() + { + return 0L; + } + + @Override public Transaction newTransaction() { return new MemoryMessageStoreTransaction(); @@ -323,6 +346,7 @@ public class MemoryMessageStore implements MessageStore public void closeMessageStore() { _messages.clear(); + _inMemorySize.set(0); synchronized (_transactionLock) { _messageInstances.clear(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index ab5764e..b50dbb1 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -61,6 +61,10 @@ public interface MessageStore MessageHandle addMessage(T metaData); + long getInMemorySize(); + + long getBytesEvacuatedFromMemory(); + /** * Is this store capable of persisting the data * http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index e543084..37c79e5 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -97,6 +97,18 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override + public long getInMemorySize() + { + return 0; + } + + @Override + public long getBytesEvacuatedFromMemory() + { + return 0L; + } + + @Override public Transaction newTransaction() { return null; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 7c3ac37..67bee82 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import static com.google.common.collect.Iterators.cycle; import static java.util.Collections.newSetFromMap; import java.io.BufferedInputStream; @@ -88,8 +89,10 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDeletedException; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageNode; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; @@ -105,6 +108,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory; import org.apache.qpid.server.protocol.LinkModel; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryIterator; import org.apache.qpid.server.security.AccessControl; import org.apache.qpid.server.security.CompoundAccessControl; import org.apache.qpid.server.security.Result; @@ -308,8 +312,6 @@ public abstract class AbstractVirtualHost> exte _fileSystemSpaceCheckerJobContext = getSystemTaskControllerContext("FileSystemSpaceChecker["+getName()+"]", _principal); _fileSystemSpaceChecker = new FileSystemSpaceChecker(); - - addChangeListener(new TargetSizeAssigningListener()); } private void updateAccessControl() @@ -1165,6 +1167,7 @@ public abstract class AbstractVirtualHost> exte if (period > 0L) { scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); + scheduleHouseKeepingTask(period, new FlowToDiskCheckingTask()); } } @@ -1444,6 +1447,18 @@ public abstract class AbstractVirtualHost> exte } @Override + public long getInMemoryMessageSize() + { + return _messageStore == null ? -1 : _messageStore.getInMemorySize(); + } + + @Override + public long getBytesEvacuatedFromMemory() + { + return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory(); + } + + @Override public > T getAttainedChildFromAddress(final Class childClass, final String address) { @@ -1738,6 +1753,12 @@ public abstract class AbstractVirtualHost> exte return null; } + @Override + public boolean isOverTargetSize() + { + return getInMemoryMessageSize() > _targetSize.get(); + } + private static class MessageHeaderImpl implements AMQMessageHeader { private final String _userName; @@ -1856,46 +1877,84 @@ public abstract class AbstractVirtualHost> exte } } - private class TargetSizeAssigningListener extends AbstractConfigurationChangeListener + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { - @Override - public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + public VirtualHostHouseKeepingTask() { - if (child instanceof Queue) - { - allocateTargetSizeToQueues(); - } + super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext); } - @Override - public void childRemoved(final ConfiguredObject object, - final ConfiguredObject child) + public void execute() { - if (child instanceof Queue) + for (Queue q : getChildren(Queue.class)) { - allocateTargetSizeToQueues(); + if (q.getState() == State.ACTIVE) + { + _logger.debug("Checking message status for queue: {}", q.getName()); + q.checkMessageStatus(); + } } } } - private class VirtualHostHouseKeepingTask extends HouseKeepingTask + private class FlowToDiskCheckingTask extends HouseKeepingTask { - public VirtualHostHouseKeepingTask() + public FlowToDiskCheckingTask() { - super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext); + super("FlowToDiskChecking["+AbstractVirtualHost.this.getName()+"]", AbstractVirtualHost.this, _housekeepingJobContext); } + @Override public void execute() { - Broker broker = getAncestor(Broker.class); - broker.assignTargetSizes(); - - for (Queue q : getChildren(Queue.class)) + if (isOverTargetSize()) { - if (q.getState() == State.ACTIVE) + long currentTargetSize = _targetSize.get(); + List queueIterators = new ArrayList<>(); + for (Queue q : getChildren(Queue.class)) { - _logger.debug("Checking message status for queue: {}", q.getName()); - q.checkMessageStatus(); + queueIterators.add(q.queueEntryIterator()); + } + Collections.shuffle(queueIterators); + + long cumulativeSize = 0; + final Iterator cyclicIterators = cycle(queueIterators); + while (cyclicIterators.hasNext()) + { + final QueueEntryIterator queueIterator = cyclicIterators.next(); + if (queueIterator.advance()) + { + QueueEntry node = queueIterator.getNode(); + if (node != null && !node.isDeleted()) + { + try (MessageReference messageReference = node.getMessage().newReference()) + { + final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage(); + if (storedMessage.isInMemory()) + { + if (cumulativeSize <= currentTargetSize) + { + cumulativeSize += storedMessage.getContentSize(); + cumulativeSize += storedMessage.getMetaData() == null + ? 0 + : storedMessage.getMetaData().getStorableSize(); + } + else + { + storedMessage.flowToDisk(); + } + } + } + catch (MessageDeletedException e) + { + // pass + } + } + } + else + { + cyclicIterators.remove(); + } } } } @@ -2396,7 +2455,6 @@ public abstract class AbstractVirtualHost> exte public void setTargetSize(final long targetSize) { _targetSize.set(targetSize); - allocateTargetSizeToQueues(); } public long getTargetSize() @@ -2404,38 +2462,12 @@ public abstract class AbstractVirtualHost> exte return _targetSize.get(); } - private void allocateTargetSizeToQueues() - { - long targetSize = _targetSize.get(); - Collection queues = getChildren(Queue.class); - long totalSize = calculateTotalEnqueuedSize(queues); - _logger.debug("Allocating target size to queues, total target: {} ; total enqueued size {}", targetSize, totalSize); - if (targetSize > 0l) - { - for (Queue q : queues) - { - long size; - if (totalSize == 0) - { - size = targetSize / queues.size(); - } - else - { - size = (long) ((q.getQueueDepthBytesIncludingHeader() / (double) totalSize) * targetSize); - } - - q.setTargetSize(size); - } - } - } - @Override public long getTotalQueueDepthBytes() { return calculateTotalEnqueuedSize(getChildren(Queue.class)); } - @Override public Principal getPrincipal() { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java index a47ab37..77289d0 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java @@ -206,6 +206,12 @@ public interface QueueManagingVirtualHost> @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Depth of Queues Including Header") long getTotalDepthOfQueuesBytesIncludingHeader(); + @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Total Memory Occupied by Message Headers and Content") + long getInMemoryMessageSize(); + + @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk") + long getBytesEvacuatedFromMemory(); + @Override @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false) Collection> getConnections(); @@ -285,6 +291,8 @@ public interface QueueManagingVirtualHost> ListenableFuture reallocateMessages(); + boolean isOverTargetSize(); + interface Transaction { void dequeue(QueueEntry entry); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/501aa891/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 3b6a509..f384a13 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -77,6 +77,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType _parent; private String _tablePrefix = ""; + private final AtomicLong _inMemorySize = new AtomicLong(); + private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong(); protected abstract boolean isMessageStoreOpen(); @@ -245,6 +247,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public void closeMessageStore() { + _inMemorySize.set(0); + _bytesEvacuatedFromMemory.set(0); if(_executor != null) { _executor.shutdown(); @@ -1124,6 +1128,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore return true; } + @Override + public long getInMemorySize() + { + return _inMemorySize.get(); + } + + @Override + public long getBytesEvacuatedFromMemory() + { + return _bytesEvacuatedFromMemory.get(); + } protected class JDBCTransaction implements Transaction { @@ -1408,10 +1423,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore _data = data; } - public void clear() + public long clear() { + long bytesCleared = 0; if(_metaData != null) { + bytesCleared += _metaData.getStorableSize(); _metaData.clearEncodedForm(); _metaData = null; } @@ -1419,10 +1436,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { for(QpidByteBuffer buf : _data) { + bytesCleared += buf.remaining(); buf.dispose(); } + _data = null; } - _data = null; + return bytesCleared; } @Override @@ -1471,7 +1490,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { _messageDataRef = new MessageDataSoftRef<>(metaData, null); } + _contentSize = metaData.getContentSize(); + _inMemorySize.addAndGet(metaData.getStorableSize()); } @@ -1531,6 +1552,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public StoredMessage allContentAdded() { + _inMemorySize.addAndGet(getContentSize()); return this; } @@ -1668,14 +1690,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } final T metaData; + long bytesCleared = 0; if ((metaData = _messageDataRef.getMetaData()) != null) { + bytesCleared += metaData.getStorableSize(); metaData.dispose(); } Collection data = _messageDataRef.getData(); if(data != null) { + bytesCleared += getContentSize(); _messageDataRef.setData(null); for(QpidByteBuffer buf : data) { @@ -1683,6 +1708,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } _messageDataRef = null; + _inMemorySize.addAndGet(-bytesCleared); } @Override @@ -1703,7 +1729,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore flushToStore(); if(_messageDataRef != null && !_messageDataRef.isHardRef()) { - ((MessageDataSoftRef)_messageDataRef).clear(); + final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear(); + _inMemorySize.addAndGet(-bytesCleared); + _bytesEvacuatedFromMemory.addAndGet(bytesCleared); } return true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org