Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 77870 invoked from network); 22 Feb 2010 17:04:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Feb 2010 17:04:00 -0000 Received: (qmail 52223 invoked by uid 500); 22 Feb 2010 17:04:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 52191 invoked by uid 500); 22 Feb 2010 17:04:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 52182 invoked by uid 99); 22 Feb 2010 17:04:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2010 17:04:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2010 17:03:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F1A82238890B; Mon, 22 Feb 2010 17:03:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r912656 - in /activemq/branches/activemq-5.3: ./ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/transport... Date: Mon, 22 Feb 2010 17:03:36 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100222170336.F1A82238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Mon Feb 22 17:03:36 2010 New Revision: 912656 URL: http://svn.apache.org/viewvc?rev=912656&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-2616 Added: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java - copied unchanged from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/ - copied from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java - copied unchanged from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java - copied unchanged from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java Modified: activemq/branches/activemq-5.3/ (props changed) activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (props changed) activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (props changed) Propchange: activemq/branches/activemq-5.3/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 22 17:03:36 2010 @@ -1 +1 @@ -/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,911812-912087,912496 +/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,911812-912087,912496-912652 Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=912656&r1=912655&r2=912656&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Mon Feb 22 17:03:36 2010 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -23,6 +24,8 @@ import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * The Queue is a List of MessageEntry objects that are dispatched to matching @@ -31,6 +34,7 @@ * @version $Revision: 1.28 $ */ public class TempQueue extends Queue{ + private static final Log LOG = LogFactory.getLog(TempQueue.class); private final ActiveMQTempDestination tempDest; @@ -50,6 +54,7 @@ this.tempDest = (ActiveMQTempDestination) destination; } + @Override public void initialize() throws Exception { this.messages=new VMPendingMessageCursor(); this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); @@ -58,6 +63,7 @@ this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName()); } + @Override public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // Only consumers on the same connection can consume from // the temporary destination @@ -74,4 +80,14 @@ } super.addSubscription(context, sub); } + + @Override + public void dispose(ConnectionContext context) throws IOException { + try { + purge(); + } catch (Exception e) { + LOG.warn("Caught an exception purging Queue: " + destination); + } + super.dispose(context); + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=912656&r1=912655&r2=912656&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Mon Feb 22 17:03:36 2010 @@ -86,7 +86,12 @@ clearIterator(true); recovered = true; } else { - LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); + /* + * we should expect to get these - as the message is recorded as it before it goes into + * the cache. If subsequently, we pull out that message from the store (before its deleted) + * it will be a duplicate - but should be ignored + */ + //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); storeHasMessages = true; } return recovered; Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=912656&r1=912655&r2=912656&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Mon Feb 22 17:03:36 2010 @@ -20,7 +20,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -33,38 +32,39 @@ * @version $Revision$ */ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { - private LinkedList list = new LinkedList(); + private final LinkedList list = new LinkedList(); private Iterator iter; - public VMPendingMessageCursor(){ - this.useCache=false; + public VMPendingMessageCursor() { + this.useCache = false; } - @Override - public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { - List rc = new ArrayList(); + public synchronized List remove(ConnectionContext context, Destination destination) + throws Exception { + List rc = new ArrayList(); for (Iterator iterator = list.iterator(); iterator.hasNext();) { MessageReference r = iterator.next(); - if( r.getRegionDestination()==destination ) { + if (r.getRegionDestination() == destination) { r.decrementReferenceCount(); rc.add(r); iterator.remove(); } } - return rc ; + return rc; } - + /** * @return true if there are no pending messages */ + @Override public synchronized boolean isEmpty() { if (list.isEmpty()) { return true; } else { for (Iterator iterator = list.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); - if (node== QueueMessageReference.NULL_MESSAGE){ - continue; + if (node == QueueMessageReference.NULL_MESSAGE) { + continue; } if (!node.isDropped()) { return false; @@ -79,6 +79,7 @@ /** * reset the cursor */ + @Override public synchronized void reset() { iter = list.listIterator(); last = null; @@ -89,6 +90,7 @@ * * @param node */ + @Override public synchronized void addMessageLast(MessageReference node) { node.incrementReferenceCount(); list.addLast(node); @@ -100,6 +102,7 @@ * @param position * @param node */ + @Override public synchronized void addMessageFirst(MessageReference node) { node.incrementReferenceCount(); list.addFirst(node); @@ -108,6 +111,7 @@ /** * @return true if there pending messages to dispatch */ + @Override public synchronized boolean hasNext() { return iter.hasNext(); } @@ -115,8 +119,9 @@ /** * @return the next pending message */ + @Override public synchronized MessageReference next() { - last = (MessageReference)iter.next(); + last = iter.next(); if (last != null) { last.incrementReferenceCount(); } @@ -126,6 +131,7 @@ /** * remove the message at the cursor position */ + @Override public synchronized void remove() { if (last != null) { last.decrementReferenceCount(); @@ -136,6 +142,7 @@ /** * @return the number of pending messages */ + @Override public synchronized int size() { return list.size(); } @@ -143,10 +150,16 @@ /** * clear all pending messages */ + @Override public synchronized void clear() { + for (Iterator i = list.iterator(); i.hasNext();) { + MessageReference ref = i.next(); + ref.decrementReferenceCount(); + } list.clear(); } + @Override public synchronized void remove(MessageReference node) { for (Iterator i = list.iterator(); i.hasNext();) { MessageReference ref = i.next(); @@ -164,11 +177,19 @@ * @param maxItems * @return a list of paged in messages */ + @Override public LinkedList pageInList(int maxItems) { return list; } - + + @Override public boolean isTransient() { return true; } + + @Override + public void destroy() throws Exception { + super.destroy(); + clear(); + } } Propchange: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 22 17:03:36 2010 @@ -1 +1 @@ -/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,905769,911812-912087,912496 +/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,905769,911812-912087,912496-912652 Propchange: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 22 17:03:36 2010 @@ -1 +1 @@ -/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764,821090,821103,821115,825009-825084,881278-881313,911812-912087,912496 +/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764,821090,821103,821115,825009-825084,881278-881313,911812-912087,912496-912652