activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r660555 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java cursors/FilePendingMessageCursor.java cursors/StoreQueueCursor.java
Date Tue, 27 May 2008 15:20:46 GMT
Author: rajdavies
Date: Tue May 27 08:20:41 2008
New Revision: 660555

URL: http://svn.apache.org/viewvc?rev=660555&view=rev
Log:
Apply patch for https://issues.apache.org/activemq/browse/AMQ-1748

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=660555&r1=660554&r2=660555&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue May 27 08:20:41 2008
@@ -1062,9 +1062,7 @@
     }
 
     final void sendMessage(final ConnectionContext context, Message msg) throws Exception
{
-        synchronized (messages) {
-            messages.addMessageLast(msg);
-        }
+        messages.addMessageLast(msg);
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         messageDelivered(context, msg);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=660555&r1=660554&r2=660555&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Tue May 27 08:20:41 2008
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -59,6 +60,7 @@
     private boolean flushRequired;
     private AtomicBoolean started = new AtomicBoolean();
     private MessageReference last = null;
+    private ReentrantLock lock = new ReentrantLock(true);
 
     /**
      * @param name
@@ -93,20 +95,25 @@
     /**
      * @return true if there are no pending messages
      */
-    public synchronized boolean isEmpty() {
-        if(memoryList.isEmpty() && isDiskListEmpty()){
-            return true;
-        }
-        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();)
{
-            MessageReference node = iterator.next();
-            if (node== QueueMessageReference.NULL_MESSAGE){
-                continue;
-            }
-            if (!node.isDropped()) {
-                return false;
+    public boolean isEmpty() {
+        lock.lock();
+        try {
+            if(memoryList.isEmpty() && isDiskListEmpty()){
+                return true;
+            }
+            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();)
{
+                MessageReference node = iterator.next();
+                if (node== QueueMessageReference.NULL_MESSAGE){
+                    continue;
+                }
+                if (!node.isDropped()) {
+                    return false;
+                }
+                // We can remove dropped references.
+                iterator.remove();
             }
-            // We can remove dropped references.
-            iterator.remove();
+        } finally {
+            lock.unlock();
         }
         return isDiskListEmpty();
     }
@@ -116,48 +123,71 @@
     /**
      * reset the cursor
      */
-    public synchronized void reset() {
-        iterating = true;
-        last = null;
-        iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
+    public void reset() {
+        lock.lock();
+        try {
+            iterating = true;
+            last = null;
+            iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized void release() {
-        iterating = false;
-        if (flushRequired) {
-            flushRequired = false;
-            flushToDisk();
+    public void release() {
+        lock.lock();
+        try {
+            synchronized(this) {
+                iterating = false;
+                this.notifyAll();
+            }
+            if (flushRequired) {
+                flushRequired = false;
+                flushToDisk();
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void destroy() throws Exception {
-        stop();
-        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
-            Message node = (Message)i.next();
-            node.decrementReferenceCount();
-        }
-        memoryList.clear();
-        if (!isDiskListEmpty()) {
-            getDiskList().clear();
+    public void destroy() throws Exception {
+        lock.lock();
+        try {
+            stop();
+            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();)
{
+                Message node = (Message)i.next();
+                node.decrementReferenceCount();
+            }
+            memoryList.clear();
+            if (!isDiskListEmpty()) {
+                getDiskList().clear();
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
-    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
-        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
+    public LinkedList<MessageReference> pageInList(int maxItems) {
         int count = 0;
-        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() &&
count < maxItems;) {
-            result.add(i.next());
-            count++;
-        }
-        if (count < maxItems && !isDiskListEmpty()) {
-            for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext()
&& count < maxItems;) {
-                Message message = (Message)i.next();
-                message.setRegionDestination(regionDestination);
-                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-                message.incrementReferenceCount();
-                result.add(message);
+        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
+        lock.lock();
+        try {
+            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext()
&& count < maxItems;) {
+                result.add(i.next());
                 count++;
             }
+            if (count < maxItems && !isDiskListEmpty()) {
+                for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext()
&& count < maxItems;) {
+                    Message message = (Message)i.next();
+                    message.setRegionDestination(regionDestination);
+                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                    message.incrementReferenceCount();
+                    result.add(message);
+                    count++;
+                }
+            }
+        } finally {
+            lock.unlock();
         }
         return result;
     }
@@ -167,35 +197,52 @@
      * 
      * @param node
      */
-    public synchronized void addMessageLast(MessageReference node) {
+    public void addMessageLast(MessageReference node) {
         if (!node.isExpired()) {
             try {
-                regionDestination = node.getMessage().getRegionDestination();
-                if (isDiskListEmpty()) {
-                    if (hasSpace() || this.store==null) {
-                        memoryList.add(node);
-                        node.incrementReferenceCount();
-                        return;
+                lock.lock();
+                try {
+                    while (iterating) {
+                        lock.unlock();
+                        synchronized(this) {
+                            try {
+                                this.wait();
+                            } catch (InterruptedException ie) {}
+                        }
+                        lock.lock();
                     }
-                }
-                if (!hasSpace()) {
+                    regionDestination = node.getMessage().getRegionDestination();
                     if (isDiskListEmpty()) {
-                        expireOldMessages();
-                        if (hasSpace()) {
+                        if (hasSpace() || this.store==null) {
                             memoryList.add(node);
                             node.incrementReferenceCount();
                             return;
-                        } else {
-                            flushToDisk();
                         }
                     }
+                    if (!hasSpace()) {
+                        if (isDiskListEmpty()) {
+                            expireOldMessages();
+                            if (hasSpace()) {
+                                memoryList.add(node);
+                                node.incrementReferenceCount();
+                                return;
+                            } else {
+                                flushToDisk();
+                            }
+                        }
+                    }
+                    if (systemUsage.getTempUsage().isFull()) {
+                        lock.unlock();
+                        systemUsage.getTempUsage().waitForSpace();
+                        lock.lock();
+                    }
+                    getDiskList().add(node);
+                } finally {
+                    lock.unlock();
                 }
-                systemUsage.getTempUsage().waitForSpace();
-                getDiskList().add(node);
-
             } catch (Exception e) {
                 LOG.error("Caught an Exception adding a message: " + node
-                        + " first to FilePendingMessageCursor ", e);
+                        + " last to FilePendingMessageCursor ", e);
                 throw new RuntimeException(e);
             }
         } else {
@@ -208,32 +255,50 @@
      * 
      * @param node
      */
-    public synchronized void addMessageFirst(MessageReference node) {
+    public void addMessageFirst(MessageReference node) {
         if (!node.isExpired()) {
             try {
-                regionDestination = node.getMessage().getRegionDestination();
-                if (isDiskListEmpty()) {
-                    if (hasSpace()) {
-                        memoryList.addFirst(node);
-                        node.incrementReferenceCount();
-                        return;
+                lock.lock();
+                try {
+                    while (iterating) {
+                        lock.unlock();
+                        synchronized(this) {
+                            try {
+                                this.wait();
+                            } catch (InterruptedException ie) {}
+                        }
+                        lock.lock();
                     }
-                }
-                if (!hasSpace()) {
+                    regionDestination = node.getMessage().getRegionDestination();
                     if (isDiskListEmpty()) {
-                        expireOldMessages();
                         if (hasSpace()) {
                             memoryList.addFirst(node);
                             node.incrementReferenceCount();
                             return;
-                        } else {
-                            flushToDisk();
                         }
                     }
+                    if (!hasSpace()) {
+                        if (isDiskListEmpty()) {
+                            expireOldMessages();
+                            if (hasSpace()) {
+                                memoryList.addFirst(node);
+                                node.incrementReferenceCount();
+                                return;
+                            } else {
+                                flushToDisk();
+                            }
+                        }
+                    }
+                    if (systemUsage.getTempUsage().isFull()) {
+                        lock.unlock();
+                        systemUsage.getTempUsage().waitForSpace();
+                        lock.lock();
+                    }
+                    node.decrementReferenceCount();
+                    getDiskList().addFirst(node);
+                } finally {
+                    lock.unlock();
                 }
-                systemUsage.getTempUsage().waitForSpace();
-                node.decrementReferenceCount();
-                getDiskList().addFirst(node);
 
             } catch (Exception e) {
                 LOG.error("Caught an Exception adding a message: " + node
@@ -244,25 +309,38 @@
             discard(node);
         }
     }
-
+    
     /**
      * @return true if there pending messages to dispatch
      */
-    public synchronized boolean hasNext() {
-        return iter.hasNext();
+    public boolean hasNext() {
+        boolean result;
+        lock.lock();
+        try {
+            result = iter.hasNext();
+        } finally {
+            lock.unlock();
+        }
+        return result;
     }
 
     /**
      * @return the next pending message
      */
-    public synchronized MessageReference next() {
-        Message message = (Message)iter.next();
-        last = message;
-        if (!isDiskListEmpty()) {
-            // got from disk
-            message.setRegionDestination(regionDestination);
-            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-            message.incrementReferenceCount();
+    public MessageReference next() {
+        Message message;
+        lock.lock();
+        try {
+            message = (Message)iter.next();
+            last = message;
+            if (!isDiskListEmpty()) {
+                // got from disk
+                message.setRegionDestination(regionDestination);
+                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                message.incrementReferenceCount();
+            }
+        } finally {
+            lock.unlock();
         }
         return message;
     }
@@ -270,10 +348,15 @@
     /**
      * remove the message at the cursor position
      */
-    public synchronized void remove() {
-        iter.remove();
-        if (last != null) {
-        	last.decrementReferenceCount();
+    public void remove() {
+        lock.lock();
+        try {
+            iter.remove();
+            if (last != null) {
+                last.decrementReferenceCount();
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -281,36 +364,61 @@
      * @param node
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
      */
-    public synchronized void remove(MessageReference node) {
-        if (memoryList.remove(node)) {
-        	node.decrementReferenceCount();
-        }
-        if (!isDiskListEmpty()) {
-            getDiskList().remove(node);
+    public void remove(MessageReference node) {
+        lock.lock();
+        try {
+            if (memoryList.remove(node)) {
+                node.decrementReferenceCount();
+            }
+            if (!isDiskListEmpty()) {
+                getDiskList().remove(node);
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
     /**
      * @return the number of pending messages
      */
-    public synchronized int size() {
-        return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
+    public int size() {
+        int result;
+        lock.lock();
+        try {
+            result = memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
+        } finally {
+            lock.unlock();
+        }
+        return result;
     }
 
     /**
      * clear all pending messages
      */
-    public synchronized void clear() {
-        memoryList.clear();
-        if (!isDiskListEmpty()) {
-            getDiskList().clear();
+    public void clear() {
+        lock.lock();
+        try {
+            memoryList.clear();
+            if (!isDiskListEmpty()) {
+                getDiskList().clear();
+            }
+            last=null;
+        } finally {
+            lock.unlock();
         }
-        last=null;
     }
 
-    public synchronized boolean isFull() {
-        // we always have space - as we can persist to disk
-        return false;
+    public boolean isFull() {
+        boolean result;
+        lock.lock();
+        try {
+            // we always have space - as we can persist to disk
+            // TODO: not necessarily true.
+            result = false;
+        } finally {
+            lock.unlock();
+        }
+        return result;
     }
 
     public boolean hasMessagesBufferedToDeliver() {
@@ -324,7 +432,8 @@
     public void onUsageChanged(Usage usage, int oldPercentUsage,
             int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 flushRequired = true;
                 if (!iterating) {
                     expireOldMessages();
@@ -333,6 +442,8 @@
                         flushRequired = false;
                     }
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
@@ -345,31 +456,39 @@
         return hasSpace() && isDiskListEmpty();
     }
     
-    protected synchronized void expireOldMessages() {
-        if (!memoryList.isEmpty()) {
-            LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
-            this.memoryList = new LinkedList<MessageReference>();
-            while (!tmpList.isEmpty()) {
-                MessageReference node = tmpList.removeFirst();
-                if (node.isExpired()) {
-                    discard(node);
-                }else {
-                    memoryList.add(node);
-                }               
+    protected void expireOldMessages() {
+        lock.lock();
+        try {
+            if (!memoryList.isEmpty()) {
+                LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
+                this.memoryList = new LinkedList<MessageReference>();
+                while (!tmpList.isEmpty()) {
+                    MessageReference node = tmpList.removeFirst();
+                    if (node.isExpired()) {
+                        discard(node);
+                    }else {
+                        memoryList.add(node);
+                    }               
+                }
             }
+        } finally {
+            lock.unlock();
         }
-
     }
 
-    protected synchronized void flushToDisk() {
-       
-        if (!memoryList.isEmpty()) {
-            while (!memoryList.isEmpty()) {
-                MessageReference node = memoryList.removeFirst();
-                node.decrementReferenceCount();
-                getDiskList().addLast(node);
+    protected void flushToDisk() {
+        lock.lock();
+        try {
+            if (!memoryList.isEmpty()) {
+                while (!memoryList.isEmpty()) {
+                    MessageReference node = memoryList.removeFirst();
+                    node.decrementReferenceCount();
+                    getDiskList().addLast(node);
+                }
+                memoryList.clear();
             }
-            memoryList.clear();
+        } finally {
+            lock.unlock();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=660555&r1=660554&r2=660555&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Tue May 27 08:20:41 2008
@@ -16,12 +16,10 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
-import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -89,7 +87,7 @@
         pendingCount = 0;
     }
 
-    public synchronized void addMessageLast(MessageReference node) throws Exception {
+    public void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
@@ -104,7 +102,7 @@
         }
     }
 
-    public synchronized void addMessageFirst(MessageReference node) throws Exception {
+    public void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
@@ -142,6 +140,11 @@
         MessageReference result = currentCursor != null ? currentCursor.next() : null;
         return result;
     }
+    
+    public synchronized void release() {
+    	nonPersistent.release();
+    	persistent.release();
+    }
 
     public synchronized void remove() {
         if (currentCursor != null) {
@@ -159,7 +162,7 @@
         pendingCount--;
     }
 
-    public synchronized void reset() {
+    public void reset() {
         nonPersistent.reset();
         persistent.reset();
     }



Mime
View raw message