qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lqu...@apache.org
Subject [5/7] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost
Date Mon, 22 May 2017 16:22:47 GMT
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers
when stopping/closing a VirtualHost

Cherry-picked from b63815c


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/6d6f8014
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/6d6f8014
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/6d6f8014

Branch: refs/heads/6.1.x
Commit: 6d6f80145810d8aa81bedde129a910e198b74ca2
Parents: 11a7522
Author: Lorenz Quack <lquack@apache.org>
Authored: Mon May 22 15:21:57 2017 +0100
Committer: Lorenz Quack <lquack@apache.org>
Committed: Mon May 22 16:17:32 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     | 55 ++++++++++++++++++--
 .../server/store/AbstractJDBCMessageStore.java  | 55 ++++++++++++++++++--
 .../qpid/server/store/MemoryMessageStore.java   |  4 ++
 .../qpid/server/store/StoredMemoryMessage.java  |  4 ++
 4 files changed, 111 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6d6f8014/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 2d6bb83..4313dcd 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -102,6 +104,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private long _totalStoreSize;
     private final Random _lockConflictRandom = new Random();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<StoredBDBMessage<?>, Boolean>());
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -125,7 +128,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
         long newMessageId = getNextMessageId();
 
-        return new StoredBDBMessage<T>(newMessageId, metaData);
+        return createStoredBDBMessage(newMessageId, metaData, false);
+    }
+
+    public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final
long newMessageId,
+                                                                                        
 final T metaData,
+                                                                                        
 final boolean recovered)
+    {
+        final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId,
metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     public long getNextMessageId()
@@ -177,6 +189,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredBDBMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _bytesEvacuatedFromMemory.set(0);
     }
 
@@ -436,7 +453,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 if (!handler.handle(message))
                 {
                     break;
@@ -482,7 +499,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
             {
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 return message;
             }
             else
@@ -915,6 +932,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        long clear();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
@@ -950,6 +968,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             return true;
         }
+
+        @Override
+        public long clear()
+        {
+            long bytesCleared = 0;
+            if(_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                _metaData.clearEncodedForm();
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    bytesCleared += buf.remaining();
+                    buf.dispose();
+                }
+                _data = null;
+            }
+            return bytesCleared;
+        }
+
     }
 
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
@@ -982,6 +1022,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             _data = data;
         }
 
+        @Override
         public long clear()
         {
             long bytesCleared = 0;
@@ -1212,6 +1253,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
+            _messages.remove(this);
             if(stored())
             {
                 removeMessage(_messageId, false);
@@ -1259,6 +1301,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             return this.getClass() + "[messageId=" + _messageId + "]";
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6d6f8014/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 971b681..18a5da3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -33,7 +33,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -71,6 +73,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private ConfiguredObject<?> _parent;
     private String _tablePrefix = "";
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<StoredJDBCMessage<?>, Boolean>());
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -232,6 +235,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredJDBCMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
         {
@@ -399,8 +407,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     {
         checkMessageStoreOpen();
 
-        return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+        return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+    }
 
+    public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final
long newMessageId,
+                                                                                        
 final T metaData,
+                                                                                        
 final boolean recovered)
+    {
+        final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId,
metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     @Override
@@ -1323,6 +1339,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        long clear();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
@@ -1354,6 +1371,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         @Override
+        public long clear()
+        {
+            long bytesCleared = 0;
+            if(_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                _metaData.clearEncodedForm();
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    bytesCleared += buf.remaining();
+                    buf.dispose();
+                }
+                _data = null;
+            }
+            return bytesCleared;
+        }
+
+        @Override
         public boolean isHardRef()
         {
             return true;
@@ -1390,6 +1428,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             _data = data;
         }
 
+        @Override
         public long clear()
         {
             long bytesCleared = 0;
@@ -1580,7 +1619,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                          _messageDataRef.getData() == null
@@ -1632,6 +1670,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
+            _messages.remove(this);
             if(stored())
             {
                 AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1673,6 +1712,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             return true;
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
+
         @Override
         public String toString()
         {
@@ -1720,7 +1767,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            message = new StoredJDBCMessage(messageId, metaData, true);
+                            message = createStoredJDBCMessage(messageId, metaData, true);
 
                         }
                         else
@@ -1773,7 +1820,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0])
&0xff);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            StoredJDBCMessage message = new StoredJDBCMessage(messageId,
metaData, true);
+                            StoredJDBCMessage message = createStoredJDBCMessage(messageId,
metaData, true);
                             if (!handler.handle(message))
                             {
                                 break;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6d6f8014/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index eb4543d..2264b9a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -338,6 +338,10 @@ public class MemoryMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+        {
+            storedMemoryMessage.clear();
+        }
         _messages.clear();
         _inMemorySize.set(0);
         synchronized (_transactionLock)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6d6f8014/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 7b8eeff..eacd7ff 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -141,4 +141,8 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData>
implements S
         return false;
     }
 
+    public void clear()
+    {
+        remove();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message