activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6387
Date Thu, 01 Sep 2016 20:26:13 GMT
Repository: activemq
Updated Branches:
  refs/heads/master a0d05f8ea -> bb8d32c04


https://issues.apache.org/jira/browse/AMQ-6387

Fix up the Memory Store such that it removes the references it adds to
messages when they are placed into the memory durable topic subscription
store.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bb8d32c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bb8d32c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bb8d32c0

Branch: refs/heads/master
Commit: bb8d32c04aa06735d0036963685a4bc41fcbaad7
Parents: a0d05f8
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Sep 1 16:26:03 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Sep 1 16:26:03 2016 -0400

----------------------------------------------------------------------
 .../store/memory/MemoryMessageStore.java        |  53 +---
 .../store/memory/MemoryPersistenceAdapter.java  |  21 +-
 .../store/memory/MemoryTopicMessageStore.java   |  63 +++--
 .../activemq/store/memory/MemoryTopicSub.java   |  37 ++-
 .../store/memory/MemoryTransactionStore.java    |  36 ++-
 .../cursors/MemoryPendingMessageCursorTest.java |   2 +-
 .../org/apache/activemq/bugs/AMQ6387Test.java   | 268 +++++++++++++++++++
 7 files changed, 374 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 736d912..8e72c48 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,26 +18,21 @@ package org.apache.activemq.store.memory;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageStoreStatistics;
 
 /**
- * An implementation of {@link org.apache.activemq.store.MessageStore} which
- * uses a
- *
- *
+ * An implementation of {@link org.apache.activemq.store.MessageStore}
  */
 public class MemoryMessageStore extends AbstractMessageStore {
 
@@ -67,23 +62,11 @@ public class MemoryMessageStore extends AbstractMessageStore {
         }
     }
 
-    // public void addMessageReference(ConnectionContext context,MessageId
-    // messageId,long expirationTime,String messageRef)
-    // throws IOException{
-    // synchronized(messageTable){
-    // messageTable.put(messageId,messageRef);
-    // }
-    // }
-
     @Override
     public Message getMessage(MessageId identity) throws IOException {
         return messageTable.get(identity);
     }
 
-    // public String getMessageReference(MessageId identity) throws IOException{
-    // return (String)messageTable.get(identity);
-    // }
-
     @Override
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         removeMessage(ack.getLastMessageId());
@@ -92,7 +75,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public void removeMessage(MessageId msgId) throws IOException {
         synchronized (messageTable) {
             Message removed = messageTable.remove(msgId);
-            if( removed !=null ) {
+            if (removed != null) {
                 removed.decrementReferenceCount();
                 decMessageStoreStatistics(getMessageStoreStatistics(), removed);
             }
@@ -104,12 +87,10 @@ public class MemoryMessageStore extends AbstractMessageStore {
 
     @Override
     public void recover(MessageRecoveryListener listener) throws Exception {
-        // the message table is a synchronizedMap - so just have to synchronize
-        // here
+        // the message table is a synchronizedMap - so just have to synchronize here
         synchronized (messageTable) {
-            for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();)
{
-                Message msg = iter.next();
-                listener.recoverMessage(msg);
+            for (Message message : messageTable.values()) {
+                listener.recoverMessage(message);
             }
         }
     }
@@ -133,17 +114,14 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws
Exception {
         synchronized (messageTable) {
             boolean pastLackBatch = lastBatchId == null;
-            int count = 0;
-            for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
-                Map.Entry entry = (Entry)iter.next();
+            for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
                 if (pastLackBatch) {
-                    count++;
                     Object msg = entry.getValue();
-                    lastBatchId = (MessageId)entry.getKey();
+                    lastBatchId = entry.getKey();
                     if (msg.getClass() == MessageId.class) {
-                        listener.recoverMessageReference((MessageId)msg);
+                        listener.recoverMessageReference((MessageId) msg);
                     } else {
-                        listener.recoverMessage((Message)msg);
+                        listener.recoverMessage((Message) msg);
                     }
                 } else {
                     pastLackBatch = entry.getKey().equals(lastBatchId);
@@ -167,7 +145,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
         synchronized (messageTable) {
             Message original = messageTable.get(message.getMessageId());
 
-            //if can't be found then increment count, else remove old size
+            // if can't be found then increment count, else remove old size
             if (original == null) {
                 getMessageStoreStatistics().getMessageCount().increment();
             } else {
@@ -183,10 +161,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
         synchronized (messageTable) {
             long size = 0;
             int count = 0;
-            for (Iterator<Message> iter = messageTable.values().iterator(); iter
-                    .hasNext();) {
-                Message msg = iter.next();
-                size += msg.getSize();
+            for (Message message : messageTable.values()) {
+                size += message.getSize();
             }
 
             getMessageStoreStatistics().reset();
@@ -208,5 +184,4 @@ public class MemoryMessageStore extends AbstractMessageStore {
             stats.getMessageSize().addSize(-message.getSize());
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 5655a48..c16ea14 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
- *
  */
 public class MemoryPersistenceAdapter implements PersistenceAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -96,7 +95,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     /**
      * Cleanup method to remove any state associated with the given destination
      *
-     * @param destination Destination to forget
+     * @param destination
+     *        Destination to forget
      */
     @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
@@ -106,7 +106,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     /**
      * Cleanup method to remove any state associated with the given destination
      *
-     * @param destination Destination to forget
+     * @param destination
+     *        Destination to forget
      */
     @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
@@ -176,10 +177,10 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter
{
 
     protected MemoryMessageStore asMemoryMessageStore(Object value) {
         if (value instanceof MemoryMessageStore) {
-            return (MemoryMessageStore)value;
+            return (MemoryMessageStore) value;
         }
         if (value instanceof ProxyMessageStore) {
-            MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
+            MessageStore delegate = ((ProxyMessageStore) value).getDelegate();
             if (delegate instanceof MemoryMessageStore) {
                 return (MemoryMessageStore) delegate;
             }
@@ -189,8 +190,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the broker's
-     *                memory usage.
+     * @param usageManager
+     *        The UsageManager that is controlling the broker's memory usage.
      */
     @Override
     public void setUsageManager(SystemUsage usageManager) {
@@ -210,7 +211,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     }
 
     @Override
-    public File getDirectory(){
+    public File getDirectory() {
         return null;
     }
 
@@ -219,7 +220,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     }
 
     @Override
-    public long size(){
+    public long size() {
         return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 308ca59..dd8be2b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,9 +19,8 @@ package org.apache.activemq.store.memory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -36,9 +35,6 @@ import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.SubscriptionKey;
 
-/**
- *
- */
 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore
{
 
     private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
@@ -48,18 +44,20 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     public MemoryTopicMessageStore(ActiveMQDestination destination) {
         this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
 
-        //Set the messageStoreStatistics after the super class is initialized so that the
stats can be
-        //properly updated on cache eviction
+        // Set the messageStoreStatistics after the super class is initialized
+        // so that the stats can be properly updated on cache eviction
         MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
         cache.setMessageStoreStatistics(messageStoreStatistics);
     }
 
-    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message>
messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
+    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message>
messageTable,
+        Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
         super(destination, messageTable);
         this.subscriberDatabase = subscriberDatabase;
         this.topicSubMap = makeSubMap();
-        //this is only necessary so that messageStoreStatistics can be set if necessary
-        //We need the original reference since messageTable is wrapped in a synchronized
map in the parent class
+        // this is only necessary so that messageStoreStatistics can be set if
+        // necessary We need the original reference since messageTable is wrapped
+        // in a synchronized map in the parent class
         this.originalMessageTable = messageTable;
     }
 
@@ -74,15 +72,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     @Override
     public synchronized void addMessage(ConnectionContext context, Message message) throws
IOException {
         super.addMessage(context, message);
-        for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();)
{
-            MemoryTopicSub sub = i.next();
+        for (MemoryTopicSub sub : topicSubMap.values()) {
             sub.addMessage(message.getMessageId(), message);
         }
     }
 
     @Override
-    public synchronized void acknowledge(ConnectionContext context, String clientId, String
subscriptionName,
-                                         MessageId messageId, MessageAck ack) throws IOException
{
+    public synchronized void acknowledge(ConnectionContext context, String clientId, String
subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+        super.removeMessage(messageId);
         SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
         MemoryTopicSub sub = topicSubMap.get(key);
         if (sub != null) {
@@ -98,12 +95,11 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     @Override
     public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive)
throws IOException {
         SubscriptionKey key = new SubscriptionKey(info);
-        MemoryTopicSub sub = new MemoryTopicSub();
+        MemoryTopicSub sub = new MemoryTopicSub(key);
         topicSubMap.put(key, sub);
         if (retroactive) {
-            for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
-                Map.Entry entry = (Entry)i.next();
-                sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
+            for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
+                sub.addMessage(entry.getKey(), entry.getValue());
             }
         }
         subscriberDatabase.put(key, info);
@@ -111,7 +107,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
 
     @Override
     public synchronized void deleteSubscription(String clientId, String subscriptionName)
{
-        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        subscriberDatabase.remove(key);
+        MemoryTopicSub subscription = topicSubMap.get(key);
+        if (subscription != null) {
+            List<Message> storedMessages = subscription.getStoredMessages();
+            for (Message message : storedMessages) {
+                try {
+                    acknowledge(null, key.getClientId(), key.getSubscriptionName(), message.getMessageId(),
null);
+                } catch (IOException e) {
+                }
+            }
+        }
+
         subscriberDatabase.remove(key);
         topicSubMap.remove(key);
     }
@@ -172,7 +180,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
         }
     }
 
-    //Disabled for the memory store, can be enabled later if necessary
+    // Disabled for the memory store, can be enabled later if necessary
     private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
 
     @Override
@@ -181,27 +189,28 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements
Topic
     }
 
     /**
-     * Since we initialize the store with a LRUCache in some cases, we need to account for
cache evictions
-     * when computing the message store statistics.
+     * Since we initialize the store with a LRUCache in some cases, we need to
+     * account for cache evictions when computing the message store statistics.
      *
      */
     private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message>
{
         private static final long serialVersionUID = -342098639681884413L;
         private MessageStoreStatistics messageStoreStatistics;
 
-        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
-                float loadFactor, boolean accessOrder) {
+        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
float loadFactor, boolean accessOrder) {
             super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
         }
 
-        public void setMessageStoreStatistics(
-                MessageStoreStatistics messageStoreStatistics) {
+        public void setMessageStoreStatistics(MessageStoreStatistics messageStoreStatistics)
{
             this.messageStoreStatistics = messageStoreStatistics;
         }
 
         @Override
         protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
             decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
+
+            // We aren't tracking this anymore so remove our reference to it.
+            eldest.getValue().decrementReferenceCount();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
index 3adf7b8..67f7546 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,26 +16,34 @@
  */
 package org.apache.activemq.store.memory;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.util.SubscriptionKey;
 
 /**
  * A holder for a durable subscriber
- *
- *
  */
 class MemoryTopicSub {
 
-    private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
+    private final Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
+    private final SubscriptionKey subscriptionKey;
+
     private MessageId lastBatch;
 
+    public MemoryTopicSub(SubscriptionKey subscriptionKey) {
+        this.subscriptionKey = subscriptionKey;
+    }
+
     void addMessage(MessageId id, Message message) {
-        synchronized(this) {
+        synchronized (this) {
             map.put(id, message);
         }
         message.incrementReferenceCount();
@@ -43,17 +51,23 @@ class MemoryTopicSub {
 
     void removeMessage(MessageId id) {
         Message removed;
-        synchronized(this) {
+        synchronized (this) {
             removed = map.remove(id);
             if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
                 resetBatching();
             }
         }
-        if( removed!=null ) {
+        if (removed != null) {
             removed.decrementReferenceCount();
         }
     }
 
+    List<Message> getStoredMessages() {
+        synchronized (this) {
+            return new ArrayList<Message>(map.values());
+        }
+    }
+
     synchronized int size() {
         return map.size();
     }
@@ -80,8 +94,7 @@ class MemoryTopicSub {
     synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
throws Exception {
         boolean pastLackBatch = lastBatch == null;
         MessageId lastId = null;
-        // the message table is a synchronizedMap - so just have to synchronize
-        // here
+        // the message table is a synchronizedMap - so just have to synchronize here
         int count = 0;
         for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator();
iter.hasNext() && count < maxReturned;) {
             Entry<MessageId, Message> entry = iter.next();
@@ -94,13 +107,17 @@ class MemoryTopicSub {
                 pastLackBatch = entry.getKey().equals(lastBatch);
             }
         }
+
         if (lastId != null) {
             lastBatch = lastId;
         }
-
     }
 
     synchronized void resetBatching() {
         lastBatch = null;
     }
+
+    SubscriptionKey getSubscriptionKey() {
+        return subscriptionKey;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index e540bdc..30cd5b5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -44,8 +44,6 @@ import org.apache.activemq.store.TransactionStore;
 /**
  * Provides a TransactionStore implementation that can create transaction aware
  * MessageStore objects from non transaction aware MessageStore objects.
- *
- *
  */
 public class MemoryTransactionStore implements TransactionStore {
 
@@ -56,6 +54,7 @@ public class MemoryTransactionStore implements TransactionStore {
     private boolean doingRecover;
 
     public class Tx {
+
         public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
         public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
@@ -107,7 +106,7 @@ public class MemoryTransactionStore implements TransactionStore {
                     cmd.run(ctx);
                 }
 
-            } catch ( IOException e ) {
+            } catch (IOException e) {
                 persistenceAdapter.rollbackTransaction(ctx);
                 throw e;
             }
@@ -134,7 +133,7 @@ public class MemoryTransactionStore implements TransactionStore {
     }
 
     public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
-        this.persistenceAdapter=persistenceAdapter;
+        this.persistenceAdapter = persistenceAdapter;
     }
 
     public MessageStore proxy(MessageStore messageStore) {
@@ -153,13 +152,13 @@ public class MemoryTransactionStore implements TransactionStore {
             public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext
context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext
context, Message message, boolean canoptimize) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
@@ -194,13 +193,13 @@ public class MemoryTransactionStore implements TransactionStore {
             public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext
context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext
context, Message message, boolean canOptimize) throws IOException {
                 MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
                 return new InlineListenableFuture();
-             }
+            }
 
             @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
@@ -213,10 +212,9 @@ public class MemoryTransactionStore implements TransactionStore {
             }
 
             @Override
-            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                            MessageId messageId, MessageAck ack) throws IOException {
-                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(),
clientId,
-                        subscriptionName, messageId, ack);
+            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack)
+                throws IOException {
+                MemoryTransactionStore.this.acknowledge((TopicMessageStore) getDelegate(),
clientId, subscriptionName, messageId, ack);
             }
         };
         onProxyTopicStore(proxyTopicMessageStore);
@@ -257,7 +255,7 @@ public class MemoryTransactionStore implements TransactionStore {
     }
 
     @Override
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable
postCommit) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable
postCommit) throws IOException {
         if (preCommit != null) {
             preCommit.run();
         }
@@ -302,7 +300,7 @@ public class MemoryTransactionStore implements TransactionStore {
             for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator();
iter.hasNext();) {
                 Object txid = iter.next();
                 Tx tx = preparedTransactions.get(txid);
-                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+                listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
                 onRecovered(tx);
             }
         } finally {
@@ -326,7 +324,9 @@ public class MemoryTransactionStore implements TransactionStore {
         if (message.getTransactionId() != null) {
             Tx tx = getTx(message.getTransactionId());
             tx.add(new AddMessageCommand() {
+                @SuppressWarnings("unused")
                 MessageStore messageStore = destination;
+
                 @Override
                 public Message getMessage() {
                     return message;
@@ -385,8 +385,8 @@ public class MemoryTransactionStore implements TransactionStore {
         }
     }
 
-    public void acknowledge(final TopicMessageStore destination, final String clientId, final
String subscriptionName,
-                           final MessageId messageId, final MessageAck ack) throws IOException
{
+    public void acknowledge(final TopicMessageStore destination, final String clientId, final
String subscriptionName, final MessageId messageId,
+        final MessageAck ack) throws IOException {
         if (doingRecover) {
             return;
         }
@@ -414,11 +414,9 @@ public class MemoryTransactionStore implements TransactionStore {
         }
     }
 
-
     public void delete() {
         inflightTransactions.clear();
         preparedTransactions.clear();
         doingRecover = false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
index 985fb94..8bd7b39 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
@@ -123,7 +123,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
 
         //The expected value is only 100 because for durables a LRUCache is being used
         //with a max size of 100
-        verifyStoreStats(dest, 100, publishedMessageSize.get());
+        verifyStoreStats(dest, 0, publishedMessageSize.get());
 
         connection.stop();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
new file mode 100644
index 0000000..0db9561
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.usage.MemoryUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ6387Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6387Test.class);
+
+    private final String QUEUE_NAME = "testQueue";
+    private final String TOPIC_NAME = "testTopic";
+    private final String SUBSCRIPTION_NAME = "subscriberId";
+    private final String CLIENT_ID = "client1";
+    private final int MSG_COUNT = 150;
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private BrokerService brokerService;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void setUp() throws Exception {
+
+        LOG.info("=============== Starting test: {} ====================", testName.getMethodName());
+
+        brokerService = new BrokerService();
+        brokerService.setAdvisorySupport(false);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        brokerService.setKeepDurableSubsActive(false);
+        brokerService.start();
+        connectionFactory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+
+        LOG.info("=============== Finished test: {} ====================", testName.getMethodName());
+    }
+
+    @Test
+    public void testQueueMessagesKeptAfterDelivery() throws Exception {
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Queue.class);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiveMessages(Queue.class);
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Queue.class));
+    }
+
+    @Test
+    public void testQueueMessagesKeptAfterPurge() throws Exception {
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Queue.class);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        getProxyToQueue(QUEUE_NAME).purge();
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        logBrokerMemoryUsage(Queue.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Queue.class));
+    }
+
+    @Test
+    public void testDurableTopicSubscriptionMessagesKeptAfterDelivery() throws Exception
{
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Topic.class);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        receiveMessages(Topic.class);
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Topic.class));
+    }
+
+    @Test
+    public void testDurableTopicSubscriptionMessagesKeptAfterUnsubscribe() throws Exception
{
+        createDurableSubscription();
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        sendBytesMessage(Topic.class);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        unsubscribeDurableSubscription();
+
+        assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        logBrokerMemoryUsage(Topic.class);
+
+        assertEquals(0, getCurrentMemoryUsage(Topic.class));
+    }
+
+    private void createDurableSubscription() throws JMSException {
+        final Connection connection = connectionFactory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic = session.createTopic(TOPIC_NAME);
+        connection.start();
+
+        session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, null, false);
+        LOG.info("Created durable subscription.");
+
+        connection.stop();
+        connection.close();
+    }
+
+    private void receiveMessages(Class<? extends Destination> destType) throws JMSException
{
+        final Connection connection = connectionFactory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Destination destination;
+        if (destType.equals(Queue.class)) {
+            destination = session.createQueue(QUEUE_NAME);
+        } else {
+            destination = session.createTopic(TOPIC_NAME);
+        }
+
+        final MessageConsumer consumer;
+        if (destType.equals(Queue.class)) {
+            consumer = session.createConsumer(destination);
+        } else {
+            consumer = session.createDurableSubscriber((Topic) destination, SUBSCRIPTION_NAME,
null, false);
+        }
+
+        connection.start();
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            assertNotNull(consumer.receive(5000));
+        }
+
+        connection.close();
+    }
+
+    private void sendBytesMessage(Class<? extends Destination> destType) throws JMSException
{
+        final Connection connection = connectionFactory.createConnection();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Destination destination;
+        if (destType.equals(Queue.class)) {
+            destination = session.createQueue(QUEUE_NAME);
+        } else {
+            destination = session.createTopic(TOPIC_NAME);
+        }
+        final MessageProducer producer = session.createProducer(destination);
+        final BytesMessage bytesMessage = session.createBytesMessage();
+
+        bytesMessage.writeBytes(new byte[1024 * 1024]);
+
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            producer.send(bytesMessage);
+        }
+
+        connection.close();
+    }
+
+    private void unsubscribeDurableSubscription() throws JMSException {
+        final Connection connection = connectionFactory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.unsubscribe(SUBSCRIPTION_NAME);
+        LOG.info("Unsubscribed durable subscription.");
+
+        connection.stop();
+        connection.close();
+    }
+
+    private long getCurrentMemoryUsage(Class<? extends Destination> destType) throws
Exception {
+        final MemoryUsage usage;
+        if (destType.equals(Queue.class)) {
+            usage = brokerService.getDestination(ActiveMQDestination.createDestination(QUEUE_NAME,
ActiveMQDestination.QUEUE_TYPE)).getMemoryUsage();
+        } else {
+            usage = brokerService.getDestination(ActiveMQDestination.createDestination(TOPIC_NAME,
ActiveMQDestination.TOPIC_TYPE)).getMemoryUsage();
+        }
+
+        return usage.getUsage();
+    }
+
+    private void logBrokerMemoryUsage(Class<? extends Destination> destType) throws
Exception {
+        LOG.info("Memory usage: broker={}% destination={}", brokerService.getAdminView().getMemoryPercentUsage(),
getCurrentMemoryUsage(destType));
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+}


Mime
View raw message