Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 37219 invoked from network); 29 May 2008 11:27:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 May 2008 11:27:59 -0000 Received: (qmail 10006 invoked by uid 500); 29 May 2008 11:28:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 9970 invoked by uid 500); 29 May 2008 11:28:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 9961 invoked by uid 99); 29 May 2008 11:28:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 May 2008 04:28:01 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 May 2008 11:27:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A10B02388A0A; Thu, 29 May 2008 04:27:34 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r661295 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java cursors/FilePendingMessageCursor.java cursors/StoreQueueCursor.java Date: Thu, 29 May 2008 11:27:34 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080529112734.A10B02388A0A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Thu May 29 04:27:33 2008 New Revision: 661295 URL: http://svn.apache.org/viewvc?rev=661295&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1748 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=661295&r1=661294&r2=661295&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu May 29 04:27:33 2008 @@ -1062,7 +1062,12 @@ } final void sendMessage(final ConnectionContext context, Message msg) throws Exception { - messages.addMessageLast(msg); + if (!msg.isPersistent() && messages.getSystemUsage() != null) { + messages.getSystemUsage().getTempUsage().waitForSpace(); + } + synchronized(messages) { + messages.addMessageLast(msg); + } destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); messageDelivered(context, msg); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=661295&r1=661294&r2=661295&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu May 29 04:27:33 2008 @@ -21,7 +21,6 @@ import java.util.LinkedList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -60,7 +59,6 @@ private boolean flushRequired; private AtomicBoolean started = new AtomicBoolean(); private MessageReference last = null; - private ReentrantLock lock = new ReentrantLock(true); /** * @param name @@ -95,25 +93,20 @@ /** * @return true if there are no pending messages */ - public boolean isEmpty() { - lock.lock(); - try { - if(memoryList.isEmpty() && isDiskListEmpty()){ - return true; - } - for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { - MessageReference node = iterator.next(); - if (node== QueueMessageReference.NULL_MESSAGE){ - continue; - } - if (!node.isDropped()) { - return false; - } - // We can remove dropped references. - iterator.remove(); + public synchronized boolean isEmpty() { + if(memoryList.isEmpty() && isDiskListEmpty()){ + return true; + } + for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { + MessageReference node = iterator.next(); + if (node== QueueMessageReference.NULL_MESSAGE){ + continue; } - } finally { - lock.unlock(); + if (!node.isDropped()) { + return false; + } + // We can remove dropped references. + iterator.remove(); } return isDiskListEmpty(); } @@ -123,71 +116,48 @@ /** * reset the cursor */ - public void reset() { - lock.lock(); - try { - iterating = true; - last = null; - iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); - } finally { - lock.unlock(); - } + public synchronized void reset() { + iterating = true; + last = null; + iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); } - public void release() { - lock.lock(); - try { - synchronized(this) { - iterating = false; - this.notifyAll(); - } - if (flushRequired) { - flushRequired = false; - flushToDisk(); - } - } finally { - lock.unlock(); + public synchronized void release() { + iterating = false; + if (flushRequired) { + flushRequired = false; + flushToDisk(); } } - public void destroy() throws Exception { - lock.lock(); - try { - stop(); - for (Iterator i = memoryList.iterator(); i.hasNext();) { - Message node = (Message)i.next(); - node.decrementReferenceCount(); - } - memoryList.clear(); - if (!isDiskListEmpty()) { - getDiskList().clear(); - } - } finally { - lock.unlock(); + public synchronized void destroy() throws Exception { + stop(); + for (Iterator i = memoryList.iterator(); i.hasNext();) { + Message node = (Message)i.next(); + node.decrementReferenceCount(); + } + memoryList.clear(); + if (!isDiskListEmpty()) { + getDiskList().clear(); } } - public LinkedList pageInList(int maxItems) { - int count = 0; + public synchronized LinkedList pageInList(int maxItems) { LinkedList result = new LinkedList(); - lock.lock(); - try { - for (Iterator i = memoryList.iterator(); i.hasNext() && count < maxItems;) { - result.add(i.next()); + int count = 0; + for (Iterator i = memoryList.iterator(); i.hasNext() && count < maxItems;) { + result.add(i.next()); + count++; + } + if (count < maxItems && !isDiskListEmpty()) { + for (Iterator i = getDiskList().iterator(); i.hasNext() && count < maxItems;) { + Message message = (Message)i.next(); + message.setRegionDestination(regionDestination); + message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); + message.incrementReferenceCount(); + result.add(message); count++; } - if (count < maxItems && !isDiskListEmpty()) { - for (Iterator i = getDiskList().iterator(); i.hasNext() && count < maxItems;) { - Message message = (Message)i.next(); - message.setRegionDestination(regionDestination); - message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); - message.incrementReferenceCount(); - result.add(message); - count++; - } - } - } finally { - lock.unlock(); } return result; } @@ -197,52 +167,35 @@ * * @param node */ - public void addMessageLast(MessageReference node) { + public synchronized void addMessageLast(MessageReference node) { if (!node.isExpired()) { try { - lock.lock(); - try { - while (iterating) { - lock.unlock(); - synchronized(this) { - try { - this.wait(); - } catch (InterruptedException ie) {} - } - lock.lock(); + regionDestination = node.getMessage().getRegionDestination(); + if (isDiskListEmpty()) { + if (hasSpace() || this.store==null) { + memoryList.add(node); + node.incrementReferenceCount(); + return; } - regionDestination = node.getMessage().getRegionDestination(); + } + if (!hasSpace()) { if (isDiskListEmpty()) { - if (hasSpace() || this.store==null) { + expireOldMessages(); + if (hasSpace()) { memoryList.add(node); node.incrementReferenceCount(); return; + } else { + flushToDisk(); } } - if (!hasSpace()) { - if (isDiskListEmpty()) { - expireOldMessages(); - if (hasSpace()) { - memoryList.add(node); - node.incrementReferenceCount(); - return; - } else { - flushToDisk(); - } - } - } - if (systemUsage.getTempUsage().isFull()) { - lock.unlock(); - systemUsage.getTempUsage().waitForSpace(); - lock.lock(); - } - getDiskList().add(node); - } finally { - lock.unlock(); } + systemUsage.getTempUsage().waitForSpace(); + getDiskList().add(node); + } catch (Exception e) { LOG.error("Caught an Exception adding a message: " + node - + " last to FilePendingMessageCursor ", e); + + " first to FilePendingMessageCursor ", e); throw new RuntimeException(e); } } else { @@ -255,50 +208,32 @@ * * @param node */ - public void addMessageFirst(MessageReference node) { + public synchronized void addMessageFirst(MessageReference node) { if (!node.isExpired()) { try { - lock.lock(); - try { - while (iterating) { - lock.unlock(); - synchronized(this) { - try { - this.wait(); - } catch (InterruptedException ie) {} - } - lock.lock(); + regionDestination = node.getMessage().getRegionDestination(); + if (isDiskListEmpty()) { + if (hasSpace()) { + memoryList.addFirst(node); + node.incrementReferenceCount(); + return; } - regionDestination = node.getMessage().getRegionDestination(); + } + if (!hasSpace()) { if (isDiskListEmpty()) { + expireOldMessages(); if (hasSpace()) { memoryList.addFirst(node); node.incrementReferenceCount(); return; + } else { + flushToDisk(); } } - if (!hasSpace()) { - if (isDiskListEmpty()) { - expireOldMessages(); - if (hasSpace()) { - memoryList.addFirst(node); - node.incrementReferenceCount(); - return; - } else { - flushToDisk(); - } - } - } - if (systemUsage.getTempUsage().isFull()) { - lock.unlock(); - systemUsage.getTempUsage().waitForSpace(); - lock.lock(); - } - node.decrementReferenceCount(); - getDiskList().addFirst(node); - } finally { - lock.unlock(); } + systemUsage.getTempUsage().waitForSpace(); + node.decrementReferenceCount(); + getDiskList().addFirst(node); } catch (Exception e) { LOG.error("Caught an Exception adding a message: " + node @@ -309,38 +244,25 @@ discard(node); } } - + /** * @return true if there pending messages to dispatch */ - public boolean hasNext() { - boolean result; - lock.lock(); - try { - result = iter.hasNext(); - } finally { - lock.unlock(); - } - return result; + public synchronized boolean hasNext() { + return iter.hasNext(); } /** * @return the next pending message */ - public MessageReference next() { - Message message; - lock.lock(); - try { - message = (Message)iter.next(); - last = message; - if (!isDiskListEmpty()) { - // got from disk - message.setRegionDestination(regionDestination); - message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); - message.incrementReferenceCount(); - } - } finally { - lock.unlock(); + public synchronized MessageReference next() { + Message message = (Message)iter.next(); + last = message; + if (!isDiskListEmpty()) { + // got from disk + message.setRegionDestination(regionDestination); + message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); + message.incrementReferenceCount(); } return message; } @@ -348,15 +270,10 @@ /** * remove the message at the cursor position */ - public void remove() { - lock.lock(); - try { - iter.remove(); - if (last != null) { - last.decrementReferenceCount(); - } - } finally { - lock.unlock(); + public synchronized void remove() { + iter.remove(); + if (last != null) { + last.decrementReferenceCount(); } } @@ -364,61 +281,36 @@ * @param node * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) */ - public void remove(MessageReference node) { - lock.lock(); - try { - if (memoryList.remove(node)) { - node.decrementReferenceCount(); - } - if (!isDiskListEmpty()) { - getDiskList().remove(node); - } - } finally { - lock.unlock(); + public synchronized void remove(MessageReference node) { + if (memoryList.remove(node)) { + node.decrementReferenceCount(); + } + if (!isDiskListEmpty()) { + getDiskList().remove(node); } } /** * @return the number of pending messages */ - public int size() { - int result; - lock.lock(); - try { - result = memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); - } finally { - lock.unlock(); - } - return result; + public synchronized int size() { + return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); } /** * clear all pending messages */ - public void clear() { - lock.lock(); - try { - memoryList.clear(); - if (!isDiskListEmpty()) { - getDiskList().clear(); - } - last=null; - } finally { - lock.unlock(); + public synchronized void clear() { + memoryList.clear(); + if (!isDiskListEmpty()) { + getDiskList().clear(); } + last=null; } - public boolean isFull() { - boolean result; - lock.lock(); - try { - // we always have space - as we can persist to disk - // TODO: not necessarily true. - result = false; - } finally { - lock.unlock(); - } - return result; + public synchronized boolean isFull() { + // we always have space - as we can persist to disk + return false; } public boolean hasMessagesBufferedToDeliver() { @@ -432,8 +324,7 @@ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) { - lock.lock(); - try { + synchronized (this) { flushRequired = true; if (!iterating) { expireOldMessages(); @@ -442,8 +333,6 @@ flushRequired = false; } } - } finally { - lock.unlock(); } } } @@ -456,39 +345,31 @@ return hasSpace() && isDiskListEmpty(); } - protected void expireOldMessages() { - lock.lock(); - try { - if (!memoryList.isEmpty()) { - LinkedList tmpList = new LinkedList(this.memoryList); - this.memoryList = new LinkedList(); - while (!tmpList.isEmpty()) { - MessageReference node = tmpList.removeFirst(); - if (node.isExpired()) { - discard(node); - }else { - memoryList.add(node); - } - } + protected synchronized void expireOldMessages() { + if (!memoryList.isEmpty()) { + LinkedList tmpList = new LinkedList(this.memoryList); + this.memoryList = new LinkedList(); + while (!tmpList.isEmpty()) { + MessageReference node = tmpList.removeFirst(); + if (node.isExpired()) { + discard(node); + }else { + memoryList.add(node); + } } - } finally { - lock.unlock(); } + } - protected void flushToDisk() { - lock.lock(); - try { - if (!memoryList.isEmpty()) { - while (!memoryList.isEmpty()) { - MessageReference node = memoryList.removeFirst(); - node.decrementReferenceCount(); - getDiskList().addLast(node); - } - memoryList.clear(); + protected synchronized void flushToDisk() { + + if (!memoryList.isEmpty()) { + while (!memoryList.isEmpty()) { + MessageReference node = memoryList.removeFirst(); + node.decrementReferenceCount(); + getDiskList().addLast(node); } - } finally { - lock.unlock(); + memoryList.clear(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=661295&r1=661294&r2=661295&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Thu May 29 04:27:33 2008 @@ -16,10 +16,12 @@ */ package org.apache.activemq.broker.region.cursors; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; +import org.apache.activemq.kaha.Store; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,7 +89,7 @@ pendingCount = 0; } - public void addMessageLast(MessageReference node) throws Exception { + public synchronized void addMessageLast(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); if (started) { @@ -102,7 +104,7 @@ } } - public void addMessageFirst(MessageReference node) throws Exception { + public synchronized void addMessageFirst(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); if (started) { @@ -140,11 +142,6 @@ MessageReference result = currentCursor != null ? currentCursor.next() : null; return result; } - - public synchronized void release() { - nonPersistent.release(); - persistent.release(); - } public synchronized void remove() { if (currentCursor != null) { @@ -162,7 +159,7 @@ pendingCount--; } - public void reset() { + public synchronized void reset() { nonPersistent.reset(); persistent.reset(); }