From commits-return-8278-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Mar 25 16:45:51 2008 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 29497 invoked from network); 25 Mar 2008 16:45:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 25 Mar 2008 16:45:51 -0000 Received: (qmail 78520 invoked by uid 500); 25 Mar 2008 16:45:41 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 78468 invoked by uid 500); 25 Mar 2008 16:45:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 78434 invoked by uid 99); 25 Mar 2008 16:45:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Mar 2008 09:45:41 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Mar 2008 16:44:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A18351A9832; Tue, 25 Mar 2008 09:45:15 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r640890 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ cursors/ Date: Tue, 25 Mar 2008 16:45:10 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080325164515.A18351A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Tue Mar 25 09:45:00 2008 New Revision: 640890 URL: http://svn.apache.org/viewvc?rev=640890&view=rev Log: In the queue case, when a consumer was closed it was not properly re-delivering messages to other available consumers. This was causing message to look like they got dropped. - When we shut a queue sub down we now get it's pending+dispatched list and re-dispatch that to the other available subscriptions. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Tue Mar 25 09:45:00 2008 @@ -17,6 +17,9 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.InvalidSelectorException; @@ -104,8 +107,9 @@ destinations.add(destination); } - public void remove(ConnectionContext context, Destination destination) throws Exception { + public List remove(ConnectionContext context, Destination destination) throws Exception { destinations.remove(destination); + return Collections.EMPTY_LIST; } public ConsumerInfo getConsumerInfo() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Mar 25 09:45:00 2008 @@ -436,11 +436,18 @@ } } - public void remove(ConnectionContext context, Destination destination) throws Exception { + public List remove(ConnectionContext context, Destination destination) throws Exception { + List rc = new ArrayList(); synchronized(pendingLock) { super.remove(context, destination); - pending.remove(context, destination); + for (MessageReference r : dispatched) { + if( r.getRegionDestination() == destination ) { + rc.add((QueueMessageReference)r); + } + } + rc.addAll(pending.remove(context, destination)); } + return rc; } protected void dispatchPending() throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Mar 25 09:45:00 2008 @@ -292,25 +292,20 @@ ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); MessageGroupSet ownedGroups = getMessageGroupOwners() .removeConsumer(consumerId); + // redeliver inflight messages - sub.remove(context, this); - List list = new ArrayList(); - List inFlight = null; - synchronized(pagedInMessages) { - inFlight = new ArrayList(pagedInMessages.values()); - } - - for (QueueMessageReference node:inFlight){ - if (!node.isDropped() && !node.isAcked() - && node.getLockOwner() == sub) { - if (node.unlock()) { - node.incrementRedeliveryCounter(); - list.add(node); - } + for (MessageReference ref : sub.remove(context, this)) { + QueueMessageReference qmr = (QueueMessageReference)ref; + qmr.incrementRedeliveryCounter(); + if( qmr.getLockOwner()==sub ) { + qmr.unlock(); + qmr.incrementRedeliveryCounter(); } + list.add(qmr); } - if (list != null && !consumers.isEmpty()) { + + if (!list.isEmpty() && !consumers.isEmpty()) { doDispatch(list); } } @@ -938,6 +933,7 @@ if( rd.subscription instanceof QueueBrowserSubscription ) { ((QueueBrowserSubscription)rd.subscription).decrementQueueRef(); } + } catch (Exception e) { e.printStackTrace(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Tue Mar 25 09:45:00 2008 @@ -87,8 +87,9 @@ * The subscription will be no longer be receiving messages from the destination. * @param context * @param destination + * @return a list of un-acked messages that were added to the subscription. */ - void remove(ConnectionContext context, Destination destination) throws Exception; + List remove(ConnectionContext context, Destination destination) throws Exception; /** * The ConsumerInfo object that created the subscription. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Tue Mar 25 09:45:00 2008 @@ -16,12 +16,15 @@ */ package org.apache.activemq.broker.region.cursors; +import java.util.Collections; import java.util.LinkedList; +import java.util.List; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; @@ -59,7 +62,9 @@ public void add(ConnectionContext context, Destination destination) throws Exception { } - public void remove(ConnectionContext context, Destination destination) throws Exception { + @SuppressWarnings("unchecked") + public List remove(ConnectionContext context, Destination destination) throws Exception { + return Collections.EMPTY_LIST; } public boolean isRecoveryRequired() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Mar 25 09:45:00 2008 @@ -110,6 +110,8 @@ } return isDiskListEmpty(); } + + /** * reset the cursor Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Tue Mar 25 09:45:00 2008 @@ -18,12 +18,14 @@ import java.io.IOException; import java.util.LinkedList; +import java.util.List; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; @@ -51,7 +53,7 @@ * @param destination * @throws Exception */ - void remove(ConnectionContext context, Destination destination) throws Exception; + List remove(ConnectionContext context, Destination destination) throws Exception; /** * @return true if there are no pending messages Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Mar 25 09:45:00 2008 @@ -17,9 +17,11 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.activemq.advisory.AdvisorySupport; @@ -27,6 +29,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; @@ -128,11 +131,12 @@ * @param destination * @throws Exception */ - public synchronized void remove(ConnectionContext context, Destination destination) throws Exception { + public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { Object tsp = topics.remove(destination); if (tsp != null) { storePrefetches.remove(tsp); } + return Collections.EMPTY_LIST; } /** Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=640890&r1=640889&r2=640890&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Tue Mar 25 09:45:00 2008 @@ -16,8 +16,13 @@ */ package org.apache.activemq.broker.region.cursors; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.QueueMessageReference; @@ -32,6 +37,18 @@ private Iterator iter; private MessageReference last; + + @Override + public List remove(ConnectionContext context, Destination destination) throws Exception { + List rc = new ArrayList(); + for (MessageReference r : list) { + if( r.getRegionDestination()==destination ) { + rc.add(r); + } + } + return rc ; + } + /** * @return true if there are no pending messages */