activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r578877 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Date Mon, 24 Sep 2007 17:00:09 GMT
Author: rajdavies
Date: Mon Sep 24 10:00:08 2007
New Revision: 578877

URL: http://svn.apache.org/viewvc?rev=578877&view=rev
Log:
fix reference counting

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=578877&r1=578876&r2=578877&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Mon Sep 24 10:00:08 2007
@@ -28,11 +28,9 @@
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
@@ -42,7 +40,6 @@
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener
{
 
-    private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
     private static final AtomicLong NAME_COUNT = new AtomicLong();
     
     private Store store;
@@ -54,6 +51,7 @@
     private boolean iterating;
     private boolean flushRequired;
     private AtomicBoolean started = new AtomicBoolean();
+    private MessageReference last = null;
 
     /**
      * @param name
@@ -85,7 +83,8 @@
      * @return true if there are no pending messages
      */
     public synchronized boolean isEmpty() {
-        return memoryList.isEmpty() && isDiskListEmpty();
+        boolean result =  memoryList.isEmpty() && isDiskListEmpty();
+        return result;
     }
 
     /**
@@ -93,6 +92,7 @@
      */
     public synchronized void reset() {
         iterating = true;
+        last = null;
         iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
     }
 
@@ -145,6 +145,7 @@
             regionDestination = node.getMessage().getRegionDestination();
             if (isSpaceInMemoryList()) {
                 memoryList.add(node);
+                node.incrementReferenceCount();
             } else {
                 flushToDisk();
                 node.decrementReferenceCount();
@@ -166,6 +167,7 @@
             regionDestination = node.getMessage().getRegionDestination();
             if (isSpaceInMemoryList()) {
                 memoryList.addFirst(node);
+                node.incrementReferenceCount();
             } else {
                 flushToDisk();
                 systemUsage.getTempUsage().waitForSpace();
@@ -189,6 +191,7 @@
      */
     public synchronized MessageReference next() {
         Message message = (Message)iter.next();
+        last = message;
         if (!isDiskListEmpty()) {
             // got from disk
             message.setRegionDestination(regionDestination);
@@ -202,6 +205,9 @@
      */
     public synchronized void remove() {
         iter.remove();
+        if (last != null) {
+        	last.decrementReferenceCount();
+        }
     }
 
     /**
@@ -209,7 +215,9 @@
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
      */
     public synchronized void remove(MessageReference node) {
-        memoryList.remove(node);
+        if (memoryList.remove(node)) {
+        	node.decrementReferenceCount();
+        }
         if (!isDiskListEmpty()) {
             getDiskList().remove(node);
         }
@@ -230,6 +238,7 @@
         if (!isDiskListEmpty()) {
             getDiskList().clear();
         }
+        last=null;
     }
 
     public synchronized boolean isFull() {



Mime
View raw message