qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lqu...@apache.org
Subject [3/6] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost
Date Tue, 23 May 2017 10:09:19 GMT
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers
when stopping/closing a VirtualHost

Cherry-picked from b63815c


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a1d66f1d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a1d66f1d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a1d66f1d

Branch: refs/heads/6.0.x
Commit: a1d66f1d344087ad088b2f55894be957abac5ca3
Parents: df071cb
Author: Lorenz Quack <lquack@apache.org>
Authored: Mon May 22 15:21:57 2017 +0100
Committer: Lorenz Quack <lquack@apache.org>
Committed: Tue May 23 10:19:59 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     | 55 ++++++++++++++++++--
 .../server/store/AbstractJDBCMessageStore.java  | 54 +++++++++++++++++--
 .../qpid/server/store/MemoryMessageStore.java   |  4 ++
 .../qpid/server/store/StoredMemoryMessage.java  |  4 ++
 4 files changed, 110 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index b52e25c..573d287 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -102,6 +104,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private long _totalStoreSize;
     private final Random _lockConflictRandom = new Random();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<StoredBDBMessage<?>, Boolean>());
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -125,7 +128,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
         long newMessageId = getNextMessageId();
 
-        return new StoredBDBMessage<T>(newMessageId, metaData);
+        return createStoredBDBMessage(newMessageId, metaData, false);
+    }
+
+    public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final
long newMessageId,
+                                                                                        
 final T metaData,
+                                                                                        
 final boolean recovered)
+    {
+        final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId,
metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     public long getNextMessageId()
@@ -177,6 +189,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredBDBMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _bytesEvacuatedFromMemory.set(0);
     }
 
@@ -436,7 +453,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 if (!handler.handle(message))
                 {
                     break;
@@ -482,7 +499,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
             {
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 return message;
             }
             else
@@ -915,6 +932,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        long clear();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
@@ -950,6 +968,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             return true;
         }
+
+        @Override
+        public long clear()
+        {
+            long bytesCleared = 0;
+            if(_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                _metaData.clearEncodedForm();
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    bytesCleared += buf.remaining();
+                    buf.dispose();
+                }
+                _data = null;
+            }
+            return bytesCleared;
+        }
+
     }
 
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
@@ -982,6 +1022,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             _data = data;
         }
 
+        @Override
         public long clear()
         {
             long bytesCleared = 0;
@@ -1191,6 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
+            _messages.remove(this);
             if(stored())
             {
                 removeMessage(_messageId, false);
@@ -1238,6 +1280,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             return this.getClass() + "[messageId=" + _messageId + "]";
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index a2ad036..e7e1b0f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -119,6 +120,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     protected final EventManager _eventManager = new EventManager();
     private ConfiguredObject<?> _parent;
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<StoredJDBCMessage<?>, Boolean>());
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -278,6 +280,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredJDBCMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
         {
@@ -443,8 +450,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     {
         checkMessageStoreOpen();
 
-        return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+        return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+    }
 
+    public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final
long newMessageId,
+                                                                                        
 final T metaData,
+                                                                                        
 final boolean recovered)
+    {
+        final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId,
metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     @Override
@@ -1310,6 +1325,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        long clear();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
@@ -1341,6 +1357,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         @Override
+        public long clear()
+        {
+            long bytesCleared = 0;
+            if(_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                _metaData.clearEncodedForm();
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    bytesCleared += buf.remaining();
+                    buf.dispose();
+                }
+                _data = null;
+            }
+            return bytesCleared;
+        }
+
+        @Override
         public boolean isHardRef()
         {
             return true;
@@ -1377,6 +1414,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             _data = data;
         }
 
+        @Override
         public long clear()
         {
             long bytesCleared = 0;
@@ -1546,7 +1584,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                          _messageDataRef.getData() == null
@@ -1598,6 +1635,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
+            _messages.remove(this);
             if(stored())
             {
                 AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1639,6 +1677,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             return true;
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
+
         @Override
         public String toString()
         {
@@ -1685,7 +1731,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            message = new StoredJDBCMessage(messageId, metaData, true);
+                            message = createStoredJDBCMessage(messageId, metaData, true);
 
                         }
                         else
@@ -1738,7 +1784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0])
&0xff);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            StoredJDBCMessage message = new StoredJDBCMessage(messageId,
metaData, true);
+                            StoredJDBCMessage message = createStoredJDBCMessage(messageId,
metaData, true);
                             if (!handler.handle(message))
                             {
                                 break;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index baedc02..8983619 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -325,6 +325,10 @@ public class MemoryMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+        {
+            storedMemoryMessage.clear();
+        }
         _messages.clear();
         synchronized (_transactionLock)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 80e317f..d5dc0c6 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -141,4 +141,8 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData>
implements S
         return false;
     }
 
+    public void clear()
+    {
+        remove();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message