qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject [2/2] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost
Date Tue, 16 May 2017 17:06:02 GMT
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers
when stopping/closing a VirtualHost


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b63815ce
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b63815ce
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b63815ce

Branch: refs/heads/master
Commit: b63815ceb13127a1b84266462aefe2c103adc34c
Parents: d9af266
Author: Lorenz Quack <lquack@apache.org>
Authored: Tue May 16 17:04:40 2017 +0100
Committer: Keith Wall <kwall@apache.org>
Committed: Tue May 16 17:32:08 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     | 144 +++++++------------
 .../qpid/server/store/MemoryMessageStore.java   |   4 +
 .../qpid/server/store/StoredMemoryMessage.java  |   5 +
 .../store/jdbc/AbstractJDBCMessageStore.java    | 144 +++++++------------
 4 files changed, 108 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 6f1326e..cef6549 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -103,6 +105,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private final Random _lockConflictRandom = new Random();
     private final AtomicLong _inMemorySize = new AtomicLong();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<>());
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -126,7 +129,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
         long newMessageId = getNextMessageId();
 
-        return new StoredBDBMessage<T>(newMessageId, metaData);
+        return createStoredBDBMessage(newMessageId, metaData, false);
+    }
+
+    public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final
long newMessageId,
+                                                                                        
 final T metaData,
+                                                                                        
 final boolean recovered)
+    {
+        final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId,
metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     public long getNextMessageId()
@@ -184,6 +196,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredBDBMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _inMemorySize.set(0);
         _bytesEvacuatedFromMemory.set(0);
     }
@@ -444,7 +461,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 if (!handler.handle(message))
                 {
                     break;
@@ -490,7 +507,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
             {
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 return message;
             }
             else
@@ -917,50 +934,49 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
     protected abstract Logger getLogger();
 
-    interface MessageDataRef<T extends StorableMessageMetaData>
+    private static class MessageDataRef<T extends StorableMessageMetaData>
     {
-        T getMetaData();
-        Collection<QpidByteBuffer> getData();
-        void setData(Collection<QpidByteBuffer> data);
-        boolean isHardRef();
-        void reallocate();
-    }
-
-    private static final class MessageDataHardRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
-    {
-        private final T _metaData;
+        private volatile T _metaData;
         private volatile Collection<QpidByteBuffer> _data;
+        private volatile boolean _isHardRef;
+
+        private MessageDataRef(final T metaData, boolean isHardRef)
+        {
+            this(metaData, null, isHardRef);
+        }
 
-        private MessageDataHardRef(final T metaData)
+        private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean
isHardRef)
         {
             _metaData = metaData;
+            _data = data;
+            _isHardRef = isHardRef;
         }
 
-        @Override
         public T getMetaData()
         {
             return _metaData;
         }
 
-        @Override
         public Collection<QpidByteBuffer> getData()
         {
             return _data;
         }
 
-        @Override
         public void setData(final Collection<QpidByteBuffer> data)
         {
             _data = data;
         }
 
-        @Override
         public boolean isHardRef()
         {
-            return true;
+            return _isHardRef;
+        }
+
+        public void setSoft()
+        {
+            _isHardRef = false;
         }
 
-        @Override
         public void reallocate()
         {
             if(_metaData != null)
@@ -969,37 +985,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
-    }
-
-    private static final class MessageDataSoftRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
-    {
-
-        private T _metaData;
-        private volatile Collection<QpidByteBuffer> _data;
-
-        private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
-        {
-            _metaData = metaData;
-            _data = data;
-        }
-
-        @Override
-        public T getMetaData()
-        {
-            return _metaData;
-        }
-
-        @Override
-        public Collection<QpidByteBuffer> getData()
-        {
-            return _data;
-        }
-
-        @Override
-        public void setData(final Collection<QpidByteBuffer> data)
-        {
-            _data = data;
-        }
 
         public long clear()
         {
@@ -1021,22 +1006,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             }
             return bytesCleared;
         }
-
-        @Override
-        public boolean isHardRef()
-        {
-            return false;
-        }
-
-        @Override
-        public void reallocate()
-        {
-            if(_metaData != null)
-            {
-                _metaData.reallocate();
-            }
-            _data = QpidByteBuffer.reallocateIfNecessary(_data);
-        }
     }
 
     final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>,
MessageHandle<T>
@@ -1046,23 +1015,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         private final int _contentSize;
         private MessageDataRef<T> _messageDataRef;
 
-        StoredBDBMessage(long messageId, T metaData)
-        {
-            this(messageId, metaData, false);
-        }
-
         StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
         {
             _messageId = messageId;
 
-            if(!isRecovered)
-            {
-                _messageDataRef = new MessageDataHardRef<>(metaData);
-            }
-            else
-            {
-                _messageDataRef = new MessageDataSoftRef<>(metaData, null);
-            }
+            _messageDataRef = new MessageDataRef<>(metaData, !isRecovered);
 
             _contentSize = metaData.getContentSize();
             _inMemorySize.addAndGet(metaData.getStorableSize());
@@ -1083,7 +1040,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
                 {
                     checkMessageStoreOpen();
                     metaData = (T) getMessageMetaData(_messageId);
-                    _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
+                    _messageDataRef = new MessageDataRef<>(metaData, _messageDataRef.getData(),
false);
                 }
                 return metaData;
             }
@@ -1200,22 +1157,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData());
                 AbstractBDBMessageStore.this.addContent(txn, _messageId,
                                                         _messageDataRef.getData() == null
                                                                 ? Collections.<QpidByteBuffer>emptySet()
                                                                 : _messageDataRef.getData());
-
-
-                MessageDataRef<T> hardRef = _messageDataRef;
-                MessageDataSoftRef<T> messageDataSoftRef;
-
-                messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(),
hardRef.getData());
-
-                _messageDataRef = messageDataSoftRef;
-
-
+                _messageDataRef.setSoft();
             }
         }
 
@@ -1247,6 +1194,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         public synchronized void remove()
         {
             checkMessageStoreOpen();
+            _messages.remove(this);
             if(stored())
             {
                 removeMessage(_messageId, false);
@@ -1293,7 +1241,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                final long bytesCleared = _messageDataRef.clear();
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1314,6 +1262,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
                 _messageDataRef.reallocate();
             }
         }
+
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 22020c0..0da7db7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -345,6 +345,10 @@ public class MemoryMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+        {
+            storedMemoryMessage.clear();
+        }
         _messages.clear();
         _inMemorySize.set(0);
         synchronized (_transactionLock)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index d0fd927..6d6921c 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -154,4 +154,9 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData>
implements S
         }
         _content.addAll(newContent);
     }
+
+    public void clear()
+    {
+        remove();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 277810d..921f1fb 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -33,7 +33,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -86,6 +88,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private String _tablePrefix = "";
     private final AtomicLong _inMemorySize = new AtomicLong();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new
ConcurrentHashMap<>());
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -247,6 +250,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredJDBCMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _inMemorySize.set(0);
         _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
@@ -415,8 +423,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     {
         checkMessageStoreOpen();
 
-        return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+        return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+    }
 
+    public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final
long newMessageId,
+                                                                                        
 final T metaData,
+                                                                                        
 final boolean recovered)
+    {
+        final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId,
metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     @Override
@@ -1339,50 +1355,49 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
     }
 
-    static interface MessageDataRef<T extends StorableMessageMetaData>
+    private static class MessageDataRef<T extends StorableMessageMetaData>
     {
-        T getMetaData();
-        Collection<QpidByteBuffer> getData();
-        void setData(Collection<QpidByteBuffer> data);
-        boolean isHardRef();
-        void reallocate();
-    }
-
-    private static final class MessageDataHardRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
-    {
-        private final T _metaData;
+        private volatile T _metaData;
         private volatile Collection<QpidByteBuffer> _data;
+        private volatile boolean _isHardRef;
 
-        private MessageDataHardRef(final T metaData)
+        private MessageDataRef(final T metaData, boolean isHardRef)
+        {
+            this(metaData, null, isHardRef);
+        }
+
+        private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean
isHardRef)
         {
             _metaData = metaData;
+            _data = data;
+            _isHardRef = isHardRef;
         }
 
-        @Override
         public T getMetaData()
         {
             return _metaData;
         }
 
-        @Override
         public Collection<QpidByteBuffer> getData()
         {
             return _data;
         }
 
-        @Override
         public void setData(final Collection<QpidByteBuffer> data)
         {
             _data = data;
         }
 
-        @Override
         public boolean isHardRef()
         {
-            return true;
+            return _isHardRef;
+        }
+
+        public void setSoft()
+        {
+            _isHardRef = false;
         }
 
-        @Override
         public void reallocate()
         {
             if(_metaData != null)
@@ -1391,37 +1406,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
-    }
-
-    private static final class MessageDataSoftRef<T extends StorableMessageMetaData>
implements MessageDataRef<T>
-    {
-
-        private T _metaData;
-        private volatile Collection<QpidByteBuffer> _data;
-
-        private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
-        {
-            _metaData = metaData;
-            _data = data;
-        }
-
-        @Override
-        public T getMetaData()
-        {
-            return _metaData;
-        }
-
-        @Override
-        public Collection<QpidByteBuffer> getData()
-        {
-            return _data;
-        }
-
-        @Override
-        public void setData(final Collection<QpidByteBuffer> data)
-        {
-            _data = data;
-        }
 
         public long clear()
         {
@@ -1443,23 +1427,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
             return bytesCleared;
         }
-
-        @Override
-        public boolean isHardRef()
-        {
-            return false;
-        }
-
-        @Override
-        public void reallocate()
-        {
-            if(_metaData != null)
-            {
-                _metaData.reallocate();
-            }
-
-            _data = QpidByteBuffer.reallocateIfNecessary(_data);
-        }
     }
 
     private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>,
MessageHandle<T>
@@ -1470,26 +1437,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
         private MessageDataRef<T> _messageDataRef;
 
-
-        StoredJDBCMessage(long messageId, T metaData)
-        {
-            this(messageId, metaData, false);
-        }
-
-
         StoredJDBCMessage(long messageId,
                           T metaData, boolean isRecovered)
         {
             _messageId = messageId;
 
-            if(!isRecovered)
-            {
-                _messageDataRef = new MessageDataHardRef<>(metaData);
-            }
-            else
-            {
-                _messageDataRef = new MessageDataSoftRef<>(metaData, null);
-            }
+            _messageDataRef = new MessageDataRef<>(metaData, !isRecovered);
 
             _contentSize = metaData.getContentSize();
             _inMemorySize.addAndGet(metaData.getStorableSize());
@@ -1513,7 +1466,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                     try
                     {
                         metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
+                        _messageDataRef = new MessageDataRef<>(metaData, _messageDataRef.getData(),
false);
                     }
                     catch (SQLException e)
                     {
@@ -1635,7 +1588,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                          _messageDataRef.getData() == null
@@ -1644,14 +1596,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
                 getLogger().debug("Storing message {} to store", _messageId);
 
-                MessageDataRef<T> hardRef = _messageDataRef;
-                MessageDataSoftRef<T> messageDataSoftRef;
-
-                messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(),
hardRef.getData());
-
-                _messageDataRef = messageDataSoftRef;
-
-
+                _messageDataRef.setSoft();
             }
         }
 
@@ -1683,6 +1628,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             getLogger().debug("REMOVE called on message: {}", _messageId);
 
             checkMessageStoreOpen();
+            _messages.remove(this);
             if(stored())
             {
                 AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1729,7 +1675,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                final long bytesCleared = _messageDataRef.clear();
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1745,6 +1691,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
+
         @Override
         public String toString()
         {
@@ -1792,7 +1746,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            message = new StoredJDBCMessage(messageId, metaData, true);
+                            message = createStoredJDBCMessage(messageId, metaData, true);
 
                         }
                         else
@@ -1845,7 +1799,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0])
&0xff);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            StoredJDBCMessage message = new StoredJDBCMessage(messageId,
metaData, true);
+                            StoredJDBCMessage message = createStoredJDBCMessage(messageId,
metaData, true);
                             if (!handler.handle(message))
                             {
                                 break;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message