activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r661295 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java cursors/FilePendingMessageCursor.java cursors/StoreQueueCursor.java
Date Thu, 29 May 2008 11:27:34 GMT
Author: rajdavies
Date: Thu May 29 04:27:33 2008
New Revision: 661295

URL: http://svn.apache.org/viewvc?rev=661295&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1748

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=661295&r1=661294&r2=661295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu May 29 04:27:33 2008
@@ -1062,7 +1062,12 @@
     }
 
     final void sendMessage(final ConnectionContext context, Message msg) throws Exception
{
-        messages.addMessageLast(msg);
+        if (!msg.isPersistent() && messages.getSystemUsage() != null) {
+            messages.getSystemUsage().getTempUsage().waitForSpace();
+        }
+        synchronized(messages) {
+            messages.addMessageLast(msg);
+        }
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         messageDelivered(context, msg);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=661295&r1=661294&r2=661295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Thu May 29 04:27:33 2008
@@ -21,7 +21,6 @@
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -60,7 +59,6 @@
     private boolean flushRequired;
     private AtomicBoolean started = new AtomicBoolean();
     private MessageReference last = null;
-    private ReentrantLock lock = new ReentrantLock(true);
 
     /**
      * @param name
@@ -95,25 +93,20 @@
     /**
      * @return true if there are no pending messages
      */
-    public boolean isEmpty() {
-        lock.lock();
-        try {
-            if(memoryList.isEmpty() && isDiskListEmpty()){
-                return true;
-            }
-            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();)
{
-                MessageReference node = iterator.next();
-                if (node== QueueMessageReference.NULL_MESSAGE){
-                    continue;
-                }
-                if (!node.isDropped()) {
-                    return false;
-                }
-                // We can remove dropped references.
-                iterator.remove();
+    public synchronized boolean isEmpty() {
+        if(memoryList.isEmpty() && isDiskListEmpty()){
+            return true;
+        }
+        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();)
{
+            MessageReference node = iterator.next();
+            if (node== QueueMessageReference.NULL_MESSAGE){
+                continue;
             }
-        } finally {
-            lock.unlock();
+            if (!node.isDropped()) {
+                return false;
+            }
+            // We can remove dropped references.
+            iterator.remove();
         }
         return isDiskListEmpty();
     }
@@ -123,71 +116,48 @@
     /**
      * reset the cursor
      */
-    public void reset() {
-        lock.lock();
-        try {
-            iterating = true;
-            last = null;
-            iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
-        } finally {
-            lock.unlock();
-        }
+    public synchronized void reset() {
+        iterating = true;
+        last = null;
+        iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
     }
 
-    public void release() {
-        lock.lock();
-        try {
-            synchronized(this) {
-                iterating = false;
-                this.notifyAll();
-            }
-            if (flushRequired) {
-                flushRequired = false;
-                flushToDisk();
-            }
-        } finally {
-            lock.unlock();
+    public synchronized void release() {
+        iterating = false;
+        if (flushRequired) {
+            flushRequired = false;
+            flushToDisk();
         }
     }
 
-    public void destroy() throws Exception {
-        lock.lock();
-        try {
-            stop();
-            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();)
{
-                Message node = (Message)i.next();
-                node.decrementReferenceCount();
-            }
-            memoryList.clear();
-            if (!isDiskListEmpty()) {
-                getDiskList().clear();
-            }
-        } finally {
-            lock.unlock();
+    public synchronized void destroy() throws Exception {
+        stop();
+        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
+            Message node = (Message)i.next();
+            node.decrementReferenceCount();
+        }
+        memoryList.clear();
+        if (!isDiskListEmpty()) {
+            getDiskList().clear();
         }
     }
 
-    public LinkedList<MessageReference> pageInList(int maxItems) {
-        int count = 0;
+    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
         LinkedList<MessageReference> result = new LinkedList<MessageReference>();
-        lock.lock();
-        try {
-            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext()
&& count < maxItems;) {
-                result.add(i.next());
+        int count = 0;
+        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() &&
count < maxItems;) {
+            result.add(i.next());
+            count++;
+        }
+        if (count < maxItems && !isDiskListEmpty()) {
+            for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext()
&& count < maxItems;) {
+                Message message = (Message)i.next();
+                message.setRegionDestination(regionDestination);
+                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                message.incrementReferenceCount();
+                result.add(message);
                 count++;
             }
-            if (count < maxItems && !isDiskListEmpty()) {
-                for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext()
&& count < maxItems;) {
-                    Message message = (Message)i.next();
-                    message.setRegionDestination(regionDestination);
-                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-                    message.incrementReferenceCount();
-                    result.add(message);
-                    count++;
-                }
-            }
-        } finally {
-            lock.unlock();
         }
         return result;
     }
@@ -197,52 +167,35 @@
      * 
      * @param node
      */
-    public void addMessageLast(MessageReference node) {
+    public synchronized void addMessageLast(MessageReference node) {
         if (!node.isExpired()) {
             try {
-                lock.lock();
-                try {
-                    while (iterating) {
-                        lock.unlock();
-                        synchronized(this) {
-                            try {
-                                this.wait();
-                            } catch (InterruptedException ie) {}
-                        }
-                        lock.lock();
+                regionDestination = node.getMessage().getRegionDestination();
+                if (isDiskListEmpty()) {
+                    if (hasSpace() || this.store==null) {
+                        memoryList.add(node);
+                        node.incrementReferenceCount();
+                        return;
                     }
-                    regionDestination = node.getMessage().getRegionDestination();
+                }
+                if (!hasSpace()) {
                     if (isDiskListEmpty()) {
-                        if (hasSpace() || this.store==null) {
+                        expireOldMessages();
+                        if (hasSpace()) {
                             memoryList.add(node);
                             node.incrementReferenceCount();
                             return;
+                        } else {
+                            flushToDisk();
                         }
                     }
-                    if (!hasSpace()) {
-                        if (isDiskListEmpty()) {
-                            expireOldMessages();
-                            if (hasSpace()) {
-                                memoryList.add(node);
-                                node.incrementReferenceCount();
-                                return;
-                            } else {
-                                flushToDisk();
-                            }
-                        }
-                    }
-                    if (systemUsage.getTempUsage().isFull()) {
-                        lock.unlock();
-                        systemUsage.getTempUsage().waitForSpace();
-                        lock.lock();
-                    }
-                    getDiskList().add(node);
-                } finally {
-                    lock.unlock();
                 }
+                systemUsage.getTempUsage().waitForSpace();
+                getDiskList().add(node);
+
             } catch (Exception e) {
                 LOG.error("Caught an Exception adding a message: " + node
-                        + " last to FilePendingMessageCursor ", e);
+                        + " first to FilePendingMessageCursor ", e);
                 throw new RuntimeException(e);
             }
         } else {
@@ -255,50 +208,32 @@
      * 
      * @param node
      */
-    public void addMessageFirst(MessageReference node) {
+    public synchronized void addMessageFirst(MessageReference node) {
         if (!node.isExpired()) {
             try {
-                lock.lock();
-                try {
-                    while (iterating) {
-                        lock.unlock();
-                        synchronized(this) {
-                            try {
-                                this.wait();
-                            } catch (InterruptedException ie) {}
-                        }
-                        lock.lock();
+                regionDestination = node.getMessage().getRegionDestination();
+                if (isDiskListEmpty()) {
+                    if (hasSpace()) {
+                        memoryList.addFirst(node);
+                        node.incrementReferenceCount();
+                        return;
                     }
-                    regionDestination = node.getMessage().getRegionDestination();
+                }
+                if (!hasSpace()) {
                     if (isDiskListEmpty()) {
+                        expireOldMessages();
                         if (hasSpace()) {
                             memoryList.addFirst(node);
                             node.incrementReferenceCount();
                             return;
+                        } else {
+                            flushToDisk();
                         }
                     }
-                    if (!hasSpace()) {
-                        if (isDiskListEmpty()) {
-                            expireOldMessages();
-                            if (hasSpace()) {
-                                memoryList.addFirst(node);
-                                node.incrementReferenceCount();
-                                return;
-                            } else {
-                                flushToDisk();
-                            }
-                        }
-                    }
-                    if (systemUsage.getTempUsage().isFull()) {
-                        lock.unlock();
-                        systemUsage.getTempUsage().waitForSpace();
-                        lock.lock();
-                    }
-                    node.decrementReferenceCount();
-                    getDiskList().addFirst(node);
-                } finally {
-                    lock.unlock();
                 }
+                systemUsage.getTempUsage().waitForSpace();
+                node.decrementReferenceCount();
+                getDiskList().addFirst(node);
 
             } catch (Exception e) {
                 LOG.error("Caught an Exception adding a message: " + node
@@ -309,38 +244,25 @@
             discard(node);
         }
     }
-    
+
     /**
      * @return true if there pending messages to dispatch
      */
-    public boolean hasNext() {
-        boolean result;
-        lock.lock();
-        try {
-            result = iter.hasNext();
-        } finally {
-            lock.unlock();
-        }
-        return result;
+    public synchronized boolean hasNext() {
+        return iter.hasNext();
     }
 
     /**
      * @return the next pending message
      */
-    public MessageReference next() {
-        Message message;
-        lock.lock();
-        try {
-            message = (Message)iter.next();
-            last = message;
-            if (!isDiskListEmpty()) {
-                // got from disk
-                message.setRegionDestination(regionDestination);
-                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-                message.incrementReferenceCount();
-            }
-        } finally {
-            lock.unlock();
+    public synchronized MessageReference next() {
+        Message message = (Message)iter.next();
+        last = message;
+        if (!isDiskListEmpty()) {
+            // got from disk
+            message.setRegionDestination(regionDestination);
+            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+            message.incrementReferenceCount();
         }
         return message;
     }
@@ -348,15 +270,10 @@
     /**
      * remove the message at the cursor position
      */
-    public void remove() {
-        lock.lock();
-        try {
-            iter.remove();
-            if (last != null) {
-                last.decrementReferenceCount();
-            }
-        } finally {
-            lock.unlock();
+    public synchronized void remove() {
+        iter.remove();
+        if (last != null) {
+        	last.decrementReferenceCount();
         }
     }
 
@@ -364,61 +281,36 @@
      * @param node
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
      */
-    public void remove(MessageReference node) {
-        lock.lock();
-        try {
-            if (memoryList.remove(node)) {
-                node.decrementReferenceCount();
-            }
-            if (!isDiskListEmpty()) {
-                getDiskList().remove(node);
-            }
-        } finally {
-            lock.unlock();
+    public synchronized void remove(MessageReference node) {
+        if (memoryList.remove(node)) {
+        	node.decrementReferenceCount();
+        }
+        if (!isDiskListEmpty()) {
+            getDiskList().remove(node);
         }
     }
 
     /**
      * @return the number of pending messages
      */
-    public int size() {
-        int result;
-        lock.lock();
-        try {
-            result = memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
-        } finally {
-            lock.unlock();
-        }
-        return result;
+    public synchronized int size() {
+        return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
     }
 
     /**
      * clear all pending messages
      */
-    public void clear() {
-        lock.lock();
-        try {
-            memoryList.clear();
-            if (!isDiskListEmpty()) {
-                getDiskList().clear();
-            }
-            last=null;
-        } finally {
-            lock.unlock();
+    public synchronized void clear() {
+        memoryList.clear();
+        if (!isDiskListEmpty()) {
+            getDiskList().clear();
         }
+        last=null;
     }
 
-    public boolean isFull() {
-        boolean result;
-        lock.lock();
-        try {
-            // we always have space - as we can persist to disk
-            // TODO: not necessarily true.
-            result = false;
-        } finally {
-            lock.unlock();
-        }
-        return result;
+    public synchronized boolean isFull() {
+        // we always have space - as we can persist to disk
+        return false;
     }
 
     public boolean hasMessagesBufferedToDeliver() {
@@ -432,8 +324,7 @@
     public void onUsageChanged(Usage usage, int oldPercentUsage,
             int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
-            lock.lock();
-            try {
+            synchronized (this) {
                 flushRequired = true;
                 if (!iterating) {
                     expireOldMessages();
@@ -442,8 +333,6 @@
                         flushRequired = false;
                     }
                 }
-            } finally {
-                lock.unlock();
             }
         }
     }
@@ -456,39 +345,31 @@
         return hasSpace() && isDiskListEmpty();
     }
     
-    protected void expireOldMessages() {
-        lock.lock();
-        try {
-            if (!memoryList.isEmpty()) {
-                LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
-                this.memoryList = new LinkedList<MessageReference>();
-                while (!tmpList.isEmpty()) {
-                    MessageReference node = tmpList.removeFirst();
-                    if (node.isExpired()) {
-                        discard(node);
-                    }else {
-                        memoryList.add(node);
-                    }               
-                }
+    protected synchronized void expireOldMessages() {
+        if (!memoryList.isEmpty()) {
+            LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
+            this.memoryList = new LinkedList<MessageReference>();
+            while (!tmpList.isEmpty()) {
+                MessageReference node = tmpList.removeFirst();
+                if (node.isExpired()) {
+                    discard(node);
+                }else {
+                    memoryList.add(node);
+                }               
             }
-        } finally {
-            lock.unlock();
         }
+
     }
 
-    protected void flushToDisk() {
-        lock.lock();
-        try {
-            if (!memoryList.isEmpty()) {
-                while (!memoryList.isEmpty()) {
-                    MessageReference node = memoryList.removeFirst();
-                    node.decrementReferenceCount();
-                    getDiskList().addLast(node);
-                }
-                memoryList.clear();
+    protected synchronized void flushToDisk() {
+       
+        if (!memoryList.isEmpty()) {
+            while (!memoryList.isEmpty()) {
+                MessageReference node = memoryList.removeFirst();
+                node.decrementReferenceCount();
+                getDiskList().addLast(node);
             }
-        } finally {
-            lock.unlock();
+            memoryList.clear();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=661295&r1=661294&r2=661295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu May 29 04:27:33 2008
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.Store;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -87,7 +89,7 @@
         pendingCount = 0;
     }
 
-    public void addMessageLast(MessageReference node) throws Exception {
+    public synchronized void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
@@ -102,7 +104,7 @@
         }
     }
 
-    public void addMessageFirst(MessageReference node) throws Exception {
+    public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
@@ -140,11 +142,6 @@
         MessageReference result = currentCursor != null ? currentCursor.next() : null;
         return result;
     }
-    
-    public synchronized void release() {
-    	nonPersistent.release();
-    	persistent.release();
-    }
 
     public synchronized void remove() {
         if (currentCursor != null) {
@@ -162,7 +159,7 @@
         pendingCount--;
     }
 
-    public void reset() {
+    public synchronized void reset() {
         nonPersistent.reset();
         persistent.reset();
     }



Mime
View raw message