activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r902999 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/store/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ active...
Date Mon, 25 Jan 2010 22:22:45 GMT
Author: rajdavies
Date: Mon Jan 25 22:22:44 2010
New Revision: 902999

URL: http://svn.apache.org/viewvc?rev=902999&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2512

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
      - copied unchanged from r901300, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
    activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Jan 25 22:22:44 2010
@@ -19,7 +19,6 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map.Entry;
-
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
@@ -47,6 +46,7 @@
         this.regionDestination=destination;
     }
     
+    @Override
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
@@ -60,6 +60,7 @@
         } 
     }
     
+    @Override
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -91,6 +92,7 @@
         return recovered;
     }
     
+    @Override
     public final void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -104,6 +106,7 @@
         size();
     }
     
+    @Override
     public synchronized void release() {
         clearIterator(false);
     }
@@ -127,6 +130,7 @@
     public final void finished() {
     }
         
+    @Override
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -140,6 +144,7 @@
         return this.iterator.hasNext();
     }
     
+    @Override
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -149,6 +154,7 @@
         return result;
     }
     
+    @Override
     public final synchronized void addMessageLast(MessageReference node) throws Exception
{
         if (cacheEnabled && hasSpace()) {
             recoverMessage(node.getMessage(),true);
@@ -171,11 +177,13 @@
     protected void setBatch(MessageId messageId) throws Exception {
     }
 
+    @Override
     public final synchronized void addMessageFirst(MessageReference node) throws Exception
{
         cacheEnabled=false;
         size++;
     }
 
+    @Override
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -184,7 +192,7 @@
         if (last != null) {
             last.decrementReferenceCount();
         }
-        if (size==0 && isStarted() && useCache && hasSpace() &&
getStoreSize() == 0) {
+        if (size==0 && isStarted() && useCache && hasSpace() &&
isStoreEmpty()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" enabling cache on last remove");
             }
@@ -192,16 +200,19 @@
         }
     }
 
+    @Override
     public final synchronized void remove(MessageReference node) {
         size--;
         cacheEnabled=false;
         batchList.remove(node.getMessageId());
     }
     
+    @Override
     public final synchronized void clear() {
         gc();
     }
     
+    @Override
     public final synchronized void gc() {
         for (Message msg : batchList.values()) {
             rollback(msg.getMessageId());
@@ -218,6 +229,7 @@
         }
     }
     
+    @Override
     protected final synchronized void fillBatch() {
         if (batchResetNeeded) {
             resetBatch();
@@ -237,15 +249,18 @@
         }
     }
     
+    @Override
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send since last reset
         return size == 0;
     }
 
+    @Override
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
 
+    @Override
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();
@@ -259,4 +274,6 @@
     protected abstract void resetBatch();
     
     protected abstract int getStoreSize();
+    
+    protected abstract boolean isStoreEmpty();
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Mon Jan 25 22:22:44 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
@@ -33,7 +32,7 @@
  */
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
-    private MessageStore store;
+    private final MessageStore store;
    
     /**
      * Construct it
@@ -41,7 +40,7 @@
      */
     public QueueStorePrefetch(Queue queue) {
         super(queue);
-        this.store = (MessageStore)queue.getMessageStore();
+        this.store = queue.getMessageStore();
 
     }
 
@@ -58,29 +57,47 @@
 
    
         
+    @Override
     protected synchronized int getStoreSize() {
         try {
-            return this.store.getMessageCount();
+            int result = this.store.getMessageCount();
+            return result;
+            
         } catch (IOException e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
     
+    @Override
+    protected synchronized boolean isStoreEmpty() {
+        try {
+            return this.store.isEmpty();
+            
+        } catch (Exception e) {
+            LOG.error("Failed to get message count", e);
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Override
     protected void resetBatch() {
         this.store.resetBatching();
     }
     
+    @Override
     protected void setBatch(MessageId messageId) throws Exception {
         store.setBatch(messageId);
         batchResetNeeded = false;
     }
 
     
+    @Override
     protected void doFillBatch() throws Exception {
         this.store.recoverNextMessages(this.maxBatchSize, this);
     }
 
+    @Override
     public String toString() {
         return "QueueStorePrefetch" + System.identityHashCode(this);
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Mon Jan 25 22:22:44 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
@@ -36,10 +35,10 @@
  */
 class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
-    private TopicMessageStore store;
-    private String clientId;
-    private String subscriberName;
-    private Subscription subscription;
+    private final TopicMessageStore store;
+    private final String clientId;
+    private final String subscriberName;
+    private final Subscription subscription;
     
     /**
      * @param topic
@@ -62,6 +61,7 @@
     }
     
         
+    @Override
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
         MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);
@@ -73,6 +73,7 @@
     }
 
    
+    @Override
     protected synchronized int getStoreSize() {
         try {
             return store.getMessageCount(clientId, subscriberName);
@@ -81,17 +82,31 @@
             throw new RuntimeException(e);
         }
     }
+    
+    @Override
+    protected synchronized boolean isStoreEmpty() {
+        try {
+            return this.store.isEmpty();
+            
+        } catch (Exception e) {
+            LOG.error("Failed to get message count", e);
+            throw new RuntimeException(e);
+        }
+    }
 
             
+    @Override
     protected void resetBatch() {
         this.store.resetBatching(clientId, subscriberName);
     }
     
+    @Override
     protected void doFillBatch() throws Exception {
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
     }
 
+    @Override
     public String toString() {
         return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + ","
+ subscriberName + ")";
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Mon Jan 25 22:22:44 2010
@@ -17,10 +17,9 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.usage.MemoryUsage;
 
 abstract public class AbstractMessageStore implements MessageStore {
@@ -48,4 +47,13 @@
     
     public void setBatch(MessageId messageId) throws IOException, Exception {
     }
+    
+    /**
+     * flag to indicate if the store is empty
+     * @return true if the message count is 0
+     * @throws Exception 
+     */
+     public boolean isEmpty() throws Exception{
+         return getMessageCount()==0;
+     }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Mon Jan 25 22:22:44 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -25,7 +24,6 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Represents a message store which is used by the persistent implementations
@@ -114,7 +112,15 @@
     /**
      * allow caching cursors to set the current batch offset when cache is exhausted
      * @param messageId
+     * @throws Exception 
      */
     void setBatch(MessageId messageId) throws Exception;
     
+    /**
+     * flag to indicate if the store is empty
+     * @return true if the message count is 0
+     * @throws Exception 
+     */
+    boolean isEmpty() throws Exception;
+    
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Mon Jan 25 22:22:44 2010
@@ -96,4 +96,8 @@
     public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
+
+    public boolean isEmpty() throws Exception {
+       return delegate.isEmpty();
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Mon Jan 25 22:22:44 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -25,7 +24,6 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -138,4 +136,8 @@
     public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
+    
+    public boolean isEmpty() throws Exception {
+        return delegate.isEmpty();
+     }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Mon Jan 25 22:22:44 2010
@@ -24,7 +24,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -69,7 +68,7 @@
 
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
-    private WireFormat wireFormat = new OpenWireFormat();
+    private final WireFormat wireFormat = new OpenWireFormat();
 
     public void setBrokerName(String brokerName) {
     }
@@ -128,6 +127,7 @@
             this.dest = convert( destination );
         }
 
+        @Override
         public ActiveMQDestination getDestination() {
             return destination;
         }
@@ -200,6 +200,19 @@
                 });
             }
         }
+        
+        public boolean isEmpty() throws IOException {
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean,
IOException>(){
+                    public Boolean execute(Transaction tx) throws IOException {
+                        // Iterate through all index entries to get a count of messages in
the destination.
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        return sd.locationIndex.isEmpty(tx);
+                    }
+                });
+            }
+        }
+
 
         public void recover(final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
@@ -266,10 +279,13 @@
             
         }
 
+        @Override
         public void setMemoryUsage(MemoryUsage memoeyUSage) {
         }
+        @Override
         public void start() throws Exception {
         }
+        @Override
         public void stop() throws Exception {
         }
         

Modified: activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
(original)
+++ activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
Mon Jan 25 22:22:44 2010
@@ -220,6 +220,10 @@
         pw.flush();
     }
 
+    synchronized public boolean isEmpty(final Transaction tx) throws IOException {
+        return getRoot(tx).isEmpty(tx);
+    }
+
     synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction
tx) throws IOException {
         return getRoot(tx).iterator(tx);
     }

Modified: activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=902999&r1=902998&r2=902999&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
(original)
+++ activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
Mon Jan 25 22:22:44 2010
@@ -507,6 +507,10 @@
         }
     }
     
+    public boolean isEmpty(final Transaction tx) throws IOException {
+        return keys.length==0;
+    }
+
     public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException
{
         if (visitor == null) {
             throw new IllegalArgumentException("Visitor cannot be null");



Mime
View raw message