activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6387
Date Thu, 01 Sep 2016 20:28:02 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x f15c0e8e0 -> fb24b48b8


https://issues.apache.org/jira/browse/AMQ-6387

Fix up the Memory Store such that it removes the references it adds to
messages when they are placed into the memory durable topic subscription
store.
(cherry picked from commit bb8d32c04aa06735d0036963685a4bc41fcbaad7)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fb24b48b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fb24b48b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fb24b48b

Branch: refs/heads/activemq-5.14.x
Commit: fb24b48b8a69c841ca86af073660247e53ebbd1e
Parents: f15c0e8
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Sep 1 16:26:03 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Sep 1 16:27:56 2016 -0400

----------------------------------------------------------------------
 .../store/memory/MemoryMessageStore.java        |  53 +---
 .../store/memory/MemoryPersistenceAdapter.java  |  21 +-
 .../store/memory/MemoryTopicMessageStore.java   |  63 +++--
 .../activemq/store/memory/MemoryTopicSub.java   |  37 ++-
 .../store/memory/MemoryTransactionStore.java    |  36 ++-
 .../cursors/MemoryPendingMessageCursorTest.java |   2 +-
 .../org/apache/activemq/bugs/AMQ6387Test.java   | 268 +++++++++++++++++++
 7 files changed, 374 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 736d912..8e72c48 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,26 +18,21 @@ package org.apache.activemq.store.memory;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageStoreStatistics;
 
 /**
- * An implementation of {@link org.apache.activemq.store.MessageStore} which
- * uses a
- *
- *
+ * An implementation of {@link org.apache.activemq.store.MessageStore}
  */
 public class MemoryMessageStore extends AbstractMessageStore {
 
@@ -67,23 +62,11 @@ public class MemoryMessageStore extends AbstractMessageStore {
         }
     }
 
-    // public void addMessageReference(ConnectionContext context,MessageId
-    // messageId,long expirationTime,String messageRef)
-    // throws IOException{
-    // synchronized(messageTable){
-    // messageTable.put(messageId,messageRef);
-    // }
-    // }
-
     @Override
     public Message getMessage(MessageId identity) throws IOException {
         return messageTable.get(identity);
     }
 
-    // public String getMessageReference(MessageId identity) throws IOException{
-    // return (String)messageTable.get(identity);
-    // }
-
     @Override
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         removeMessage(ack.getLastMessageId());
@@ -92,7 +75,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public void removeMessage(MessageId msgId) throws IOException {
         synchronized (messageTable) {
             Message removed = messageTable.remove(msgId);
-            if( removed !=null ) {
+            if (removed != null) {
                 removed.decrementReferenceCount();
                 decMessageStoreStatistics(getMessageStoreStatistics(), removed);
             }
@@ -104,12 +87,10 @@ public class MemoryMessageStore extends AbstractMessageStore {
 
     @Override
     public void recover(MessageRecoveryListener listener) throws Exception {
-        // the message table is a synchronizedMap - so just have to synchronize
-        // here
+        // the message table is a synchronizedMap - so just have to synchronize here
         synchronized (messageTable) {
-            for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();)
{
-                Message msg = iter.next();
-                listener.recoverMessage(msg);
+            for (Message message : messageTable.values()) {
+                listener.recoverMessage(message);
             }
         }
     }
@@ -133,17 +114,14 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws
Exception {
         synchronized (messageTable) {
             boolean pastLackBatch = lastBatchId == null;
-            int count = 0;
-            for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
-                Map.Entry entry = (Entry)iter.next();
+            for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
                 if (pastLackBatch) {
-                    count++;
                     Object msg = entry.getValue();
-                    lastBatchId = (MessageId)entry.getKey();
+                    lastBatchId = entry.getKey();
                     if (msg.getClass() == MessageId.class) {
-                        listener.recoverMessageReference((MessageId)msg);
+                        listener.recoverMessageReference((MessageId) msg);
                     } else {
-                        listener.recoverMessage((Message)msg);
+                        listener.recoverMessage((Message) msg);
                     }
                 } else {
                     pastLackBatch = entry.getKey().equals(lastBatchId);
@@ -167,7 +145,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
         synchronized (messageTable) {
             Message original = messageTable.get(message.getMessageId());
 
-            //if can't be found then increment count, else remove old size
+            // if can't be found then increment count, else remove old size
             if (original == null) {
                 getMessageStoreStatistics().getMessageCount().increment();
             } else {
@@ -183,10 +161,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
         synchronized (messageTable) {
             long size = 0;
             int count = 0;
-            for (Iterator<Message> iter = messageTable.values().iterator(); iter
-                    .hasNext();) {
-                Message msg = iter.next();
-                size += msg.getSize();
+            for (Message message : messageTable.values()) {
+                size += message.getSize();
             }
 
             getMessageStoreStatistics().reset();
@@ -208,5 +184,4 @@ public class MemoryMessageStore extends AbstractMessageStore {
             stats.getMessageSize().addSize(-message.getSize());
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 5655a48..c16ea14 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
- *
  */
 public class MemoryPersistenceAdapter implements PersistenceAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -96,7 +95,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     /**
      * Cleanup method to remove any state associated with the given destination
      *
-     * @param destination Destination to forget
+     * @param destination
+     *        Destination to forget
      */
     @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
@@ -106,7 +106,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     /**
      * Cleanup method to remove any state associated with the given destination
      *
-     * @param destination Destination to forget
+     * @param destination
+     *        Destination to forget
      */
     @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
@@ -176,10 +177,10 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter
{
 
     protected MemoryMessageStore asMemoryMessageStore(Object value) {
         if (value instanceof MemoryMessageStore) {
-            return (MemoryMessageStore)value;
+            return (MemoryMessageStore) value;
         }
         if (value instanceof ProxyMessageStore) {
-            MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
+            MessageStore delegate = ((ProxyMessageStore) value).getDelegate();
             if (delegate instanceof MemoryMessageStore) {
                 return (MemoryMessageStore) delegate;
             }
@@ -189,8 +190,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the broker's
-     *                memory usage.
+     * @param usageManager
+     *        The UsageManager that is controlling the broker's memory usage.
      */
     @Override
     public void setUsageManager(SystemUsage usageManager) {
@@ -210,7 +211,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     }
 
     @Override
-    public File getDirectory(){
+    public File getDirectory() {
         return null;
     }
 
@@ -219,7 +220,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     }
 
     @Override
-    public long size(){
+    public long size() {
         return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 308ca59..dd8be2b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,9 +19,8 @@ package org.apache.activemq.store.memory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -36,9 +35,6 @@ import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.SubscriptionKey;
 
-/**
- *
- */
 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore
{
 
     private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
@@ -48,18 +44,20 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     public MemoryTopicMessageStore(ActiveMQDestination destination) {
         this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
 
-        //Set the messageStoreStatistics after the super class is initialized so that the
stats can be
-        //properly updated on cache eviction
+        // Set the messageStoreStatistics after the super class is initialized
+        // so that the stats can be properly updated on cache eviction
         MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
         cache.setMessageStoreStatistics(messageStoreStatistics);
     }
 
-    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message>
messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
+    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message>
messageTable,
+        Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
         super(destination, messageTable);
         this.subscriberDatabase = subscriberDatabase;
         this.topicSubMap = makeSubMap();
-        //this is only necessary so that messageStoreStatistics can be set if necessary
-        //We need the original reference since messageTable is wrapped in a synchronized
map in the parent class
+        // this is only necessary so that messageStoreStatistics can be set if
+        // necessary We need the original reference since messageTable is wrapped
+        // in a synchronized map in the parent class
         this.originalMessageTable = messageTable;
     }
 
@@ -74,15 +72,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     @Override
     public synchronized void addMessage(ConnectionContext context, Message message) throws
IOException {
         super.addMessage(context, message);
-        for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();)
{
-            MemoryTopicSub sub = i.next();
+        for (MemoryTopicSub sub : topicSubMap.values()) {
             sub.addMessage(message.getMessageId(), message);
         }
     }
 
     @Override
-    public synchronized void acknowledge(ConnectionContext context, String clientId, String
subscriptionName,
-                                         MessageId messageId, MessageAck ack) throws IOException
{
+    public synchronized void acknowledge(ConnectionContext context, String clientId, String
subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+        super.removeMessage(messageId);
         SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
         MemoryTopicSub sub = topicSubMap.get(key);
         if (sub != null) {
@@ -98,12 +95,11 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     @Override
     public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive)
throws IOException {
         SubscriptionKey key = new SubscriptionKey(info);
-        MemoryTopicSub sub = new MemoryTopicSub();
+        MemoryTopicSub sub = new MemoryTopicSub(key);
         topicSubMap.put(key, sub);
         if (retroactive) {
-            for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
-                Map.Entry entry = (Entry)i.next();
-                sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
+            for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
+                sub.addMessage(entry.getKey(), entry.getValue());
             }
         }
         subscriberDatabase.put(key, info);
@@ -111,7 +107,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
 
     @Override
     public synchronized void deleteSubscription(String clientId, String subscriptionName)
{
-        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        subscriberDatabase.remove(key);
+        MemoryTopicSub subscription = topicSubMap.get(key);
+        if (subscription != null) {
+            List<Message> storedMessages = subscription.getStoredMessages();
+            for (Message message : storedMessages) {
+                try {
+                    acknowledge(null, key.getClientId(), key.getSubscriptionName(), message.getMessageId(),
null);
+                } catch (IOException e) {
+                }
+            }
+        }
+
         subscriberDatabase.remove(key);
         topicSubMap.remove(key);
     }
@@ -172,7 +180,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
         }
     }
 
-    //Disabled for the memory store, can be enabled later if necessary
+    // Disabled for the memory store, can be enabled later if necessary
     private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
 
     @Override
@@ -181,27 +189,28 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     }
 
     /**
-     * Since we initialize the store with a LRUCache in some cases, we need to account for
cache evictions
-     * when computing the message store statistics.
+     * Since we initialize the store with a LRUCache in some cases, we need to
+     * account for cache evictions when computing the message store statistics.
      *
      */
     private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message>
{
         private static final long serialVersionUID = -342098639681884413L;
         private MessageStoreStatistics messageStoreStatistics;
 
-        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
-                float loadFactor, boolean accessOrder) {
+        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
float loadFactor, boolean accessOrder) {
             super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
         }
 
-        public void setMessageStoreStatistics(
-                MessageStoreStatistics messageStoreStatistics) {
+        public void setMessageStoreStatistics(MessageStoreStatistics messageStoreStatistics)
{
             this.messageStoreStatistics = messageStoreStatistics;
         }
 
         @Override
         protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
             decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
+
+            // We aren't tracking this anymore so remove our reference to it.
+            eldest.getValue().decrementReferenceCount();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
index 3adf7b8..67f7546 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,26 +16,34 @@
  */
 package org.apache.activemq.store.memory;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.util.SubscriptionKey;
 
 /**
  * A holder for a durable subscriber
- *
- *
  */
 class MemoryTopicSub {
 
-    private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
+    private final Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
+    private final SubscriptionKey subscriptionKey;
+
     private MessageId lastBatch;
 
+    public MemoryTopicSub(SubscriptionKey subscriptionKey) {
+        this.subscriptionKey = subscriptionKey;
+    }
+
     void addMessage(MessageId id, Message message) {
-        synchronized(this) {
+        synchronized (this) {
             map.put(id, message);
         }
         message.incrementReferenceCount();
@@ -43,17 +51,23 @@ class MemoryTopicSub {
 
     void removeMessage(MessageId id) {
         Message removed;
-        synchronized(this) {
+        synchronized (this) {
             removed = map.remove(id);
             if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
                 resetBatching();
             }
         }
-        if( removed!=null ) {
+        if (removed != null) {
             removed.decrementReferenceCount();
         }
     }
 
+    List<Message> getStoredMessages() {
+        synchronized (this) {
+            return new ArrayList<Message>(map.values());
+        }
+    }
+
     synchronized int size() {
         return map.size();
     }
@@ -80,8 +94,7 @@ class MemoryTopicSub {
     synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
throws Exception {
         boolean pastLackBatch = lastBatch == null;
         MessageId lastId = null;
-        // the message table is a synchronizedMap - so just have to synchronize
-        // here
+        // the message table is a synchronizedMap - so just have to synchronize here
         int count = 0;
         for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator();
iter.hasNext() && count < maxReturned;) {
             Entry<MessageId, Message> entry = iter.next();
@@ -94,13 +107,17 @@ class MemoryTopicSub {
                 pastLackBatch = entry.getKey().equals(lastBatch);
             }
         }
+
         if (lastId != null) {
             lastBatch = lastId;
         }
-
     }
 
     synchronized void resetBatching() {
         lastBatch = null;
     }
+
+    SubscriptionKey getSubscriptionKey() {
+        return subscriptionKey;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index e540bdc..30cd5b5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -44,8 +44,6 @@ import org.apache.activemq.store.TransactionStore;
 /**
  * Provides a TransactionStore implementation that can create transaction aware
  * MessageStore objects from non transaction aware MessageStore objects.
- *
- *
  */
 public class MemoryTransactionStore implements TransactionStore {
 
@@ -56,6 +54,7 @@ public class MemoryTransactionStore implements TransactionStore {
     private boolean doingRecover;
 
     public class Tx {
+
         public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
         public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
@@ -107,7 +106,7 @@ public class MemoryTransactionStore implements TransactionStore {
                     cmd.run(ctx);
                 }
 
-            } catch ( IOException e ) {
+            } catch (IOException e) {
                 persistenceAdapter.rollbackTransaction(ctx);
                 throw e;
             }
@@ -134,7 +133,7 @@ public class MemoryTransactionStore implements TransactionStore {
     }
 
     public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
-        this.persistenceAdapter=persistenceAdapter;
+        this.persistenceAdapter = persistenceAdapter;
     }
 
     public MessageStore proxy(MessageStore messageStore) {
@@ -153,13 +152,13 @@ public class MemoryTransactionStore implements TransactionStore {
             public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext
context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext
context, Message message, boolean canoptimize) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
@@ -194,13 +193,13 @@ public class MemoryTransactionStore implements TransactionStore {
             public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext
context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext
context, Message message, boolean canOptimize) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
@@ -213,10 +212,9 @@ public class MemoryTransactionStore implements TransactionStore {
             }
 
             @Override
-            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                            MessageId messageId, MessageAck ack) throws IOException {
-                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(),
clientId,
-                        subscriptionName, messageId, ack);
+            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack)
+                throws IOException {
+                MemoryTransactionStore.this.acknowledge((TopicMessageStore) getDelegate(),
clientId, subscriptionName, messageId, ack);
             }
         };
         onProxyTopicStore(proxyTopicMessageStore);
@@ -257,7 +255,7 @@ public class MemoryTransactionStore implements TransactionStore {
     }
 
     @Override
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable
postCommit) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable
postCommit) throws IOException {
         if (preCommit != null) {
             preCommit.run();
         }
@@ -302,7 +300,7 @@ public class MemoryTransactionStore implements TransactionStore {
             for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator();
iter.hasNext();) {
                 Object txid = iter.next();
                 Tx tx = preparedTransactions.get(txid);
-                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+                listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
                 onRecovered(tx);
             }
         } finally {
@@ -326,7 +324,9 @@ public class MemoryTransactionStore implements TransactionStore {
         if (message.getTransactionId() != null) {
             Tx tx = getTx(message.getTransactionId());
             tx.add(new AddMessageCommand() {
+                @SuppressWarnings("unused")
                 MessageStore messageStore = destination;
+
                 @Override
                 public Message getMessage() {
                     return message;
@@ -385,8 +385,8 @@ public class MemoryTransactionStore implements TransactionStore {
         }
     }
 
-    public void acknowledge(final TopicMessageStore destination, final String clientId, final
String subscriptionName,
-                           final MessageId messageId, final MessageAck ack) throws IOException
{
+    public void acknowledge(final TopicMessageStore destination, final String clientId, final
String subscriptionName, final MessageId messageId,
+        final MessageAck ack) throws IOException {
         if (doingRecover) {
             return;
         }
@@ -414,11 +414,9 @@ public class MemoryTransactionStore implements TransactionStore {
         }
     }
 
-
     public void delete() {
         inflightTransactions.clear();
         preparedTransactions.clear();
         doingRecover = false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
index 985fb94..8bd7b39 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
@@ -123,7 +123,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
 
         //The expected value is only 100 because for durables a LRUCache is being used
         //with a max size of 100
-        verifyStoreStats(dest, 100, publishedMessageSize.get());
+        verifyStoreStats(dest, 0, publishedMessageSize.get());
 
         connection.stop();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb24b48b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
new file mode 100644
index 0000000..0db9561
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.usage.MemoryUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ6387Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6387Test.class);
+
+    private final String QUEUE_NAME = "testQueue";
+    private final String TOPIC_NAME = "testTopic";
+    private final String SUBSCRIPTION_NAME = "subscriberId";
+    private final String CLIENT_ID = "client1";
+    private final int MSG_COUNT = 150;
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private BrokerService brokerService;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void setUp() throws Exception {
+
+        LOG.info("=============== Starting test: {} ====================", testName.getMethodName());
+
+        brokerService = new BrokerService();
+        brokerService.setAdvisorySupport(false);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        brokerService.setKeepDurableSubsActive(false);
+        brokerService.start();
+        connectionFactory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+
+        LOG.info("=============== Finished test: {} ====================", testName.getMethodName());
+    }
+
+    @Test
+    public void testQueueMessagesKeptAfterDelivery() throws Exception {
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Queue.class);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiveMessages(Queue.class);
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Queue.class));
+    }
+
+    @Test
+    public void testQueueMessagesKeptAfterPurge() throws Exception {
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Queue.class);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        getProxyToQueue(QUEUE_NAME).purge();
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Queue.class));
+    }
+
+    @Test
+    public void testDurableTopicSubscriptionMessagesKeptAfterDelivery() throws Exception
{
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Topic.class);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        receiveMessages(Topic.class);
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Topic.class));
+    }
+
+    @Test
+    public void testDurableTopicSubscriptionMessagesKeptAfterUnsubscribe() throws Exception
{
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Topic.class);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        unsubscribeDurableSubscription();
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Topic.class));
+    }
+
+    private void createDurableSubscription() throws JMSException {
+        final Connection connection = connectionFactory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic = session.createTopic(TOPIC_NAME);
+        connection.start();
+
+        session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, null, false);
+        LOG.info("Created durable subscription.");
+
+        connection.stop();
+        connection.close();
+    }
+
+    private void receiveMessages(Class<? extends Destination> destType) throws JMSException
{
+        final Connection connection = connectionFactory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Destination destination;
+        if (destType.equals(Queue.class)) {
+            destination = session.createQueue(QUEUE_NAME);
+        } else {
+            destination = session.createTopic(TOPIC_NAME);
+        }
+
+        final MessageConsumer consumer;
+        if (destType.equals(Queue.class)) {
+            consumer = session.createConsumer(destination);
+        } else {
+            consumer = session.createDurableSubscriber((Topic) destination, SUBSCRIPTION_NAME,
null, false);
+        }
+
+        connection.start();
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            assertNotNull(consumer.receive(5000));
+        }
+
+        connection.close();
+    }
+
+    private void sendBytesMessage(Class<? extends Destination> destType) throws JMSException
{
+        final Connection connection = connectionFactory.createConnection();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Destination destination;
+        if (destType.equals(Queue.class)) {
+            destination = session.createQueue(QUEUE_NAME);
+        } else {
+            destination = session.createTopic(TOPIC_NAME);
+        }
+        final MessageProducer producer = session.createProducer(destination);
+        final BytesMessage bytesMessage = session.createBytesMessage();
+
+        bytesMessage.writeBytes(new byte[1024 * 1024]);
+
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            producer.send(bytesMessage);
+        }
+
+        connection.close();
+    }
+
+    private void unsubscribeDurableSubscription() throws JMSException {
+        final Connection connection = connectionFactory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.unsubscribe(SUBSCRIPTION_NAME);
+        LOG.info("Unsubscribed durable subscription.");
+
+        connection.stop();
+        connection.close();
+    }
+
+    private long getCurrentMemoryUsage(Class<? extends Destination> destType) throws
Exception {
+        final MemoryUsage usage;
+        if (destType.equals(Queue.class)) {
+            usage = brokerService.getDestination(ActiveMQDestination.createDestination(QUEUE_NAME,
ActiveMQDestination.QUEUE_TYPE)).getMemoryUsage();
+        } else {
+            usage = brokerService.getDestination(ActiveMQDestination.createDestination(TOPIC_NAME,
ActiveMQDestination.TOPIC_TYPE)).getMemoryUsage();
+        }
+
+        return usage.getUsage();
+    }
+
+    private void logBrokerMemoryUsage(Class<? extends Destination> destType) throws
Exception {
+        LOG.info("Memory usage: broker={}% destination={}", brokerService.getAdminView().getMemoryPercentUsage(),
getCurrentMemoryUsage(destType));
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+}


Mime
View raw message