Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 65636 invoked from network); 28 Feb 2007 07:23:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 28 Feb 2007 07:23:59 -0000 Received: (qmail 82696 invoked by uid 500); 28 Feb 2007 07:24:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 82650 invoked by uid 500); 28 Feb 2007 07:24:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 82639 invoked by uid 99); 28 Feb 2007 07:24:08 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Feb 2007 23:24:08 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Feb 2007 23:23:58 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 81F391A981A; Tue, 27 Feb 2007 23:23:38 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r512640 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors: StoreDurableSubscriberCursor.java TopicStorePrefetch.java Date: Wed, 28 Feb 2007 07:23:38 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070228072338.81F391A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue Feb 27 23:23:37 2007 New Revision: 512640 URL: http://svn.apache.org/viewvc?view=rev&rev=512640 Log: tidied up the way messages are page in for a durable subscriber Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=512640&r1=512639&r2=512640 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Feb 27 23:23:37 2007 @@ -168,6 +168,10 @@ public void clear(){ pendingCount=0; + nonPersistent.clear(); + for(PendingMessageCursor tsp: storePrefetches){ + tsp.clear(); + } } public synchronized boolean hasNext(){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=512640&r1=512639&r2=512640 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Feb 27 23:23:37 2007 @@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory; /** - * perist pending messages pending message (messages awaiting disptach to a consumer) cursor + * perist pendingCount messages pendingCount message (messages awaiting disptach to a consumer) cursor * * @version $Revision$ */ @@ -39,15 +39,15 @@ private String clientId; private String subscriberName; private Destination regionDestination; - boolean empty; private MessageId firstMessageId; private MessageId lastMessageId; + private int pendingCount; + private boolean started; /** * @param topic * @param clientId * @param subscriberName - * @throws IOException */ public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){ this.regionDestination=topic; @@ -56,49 +56,70 @@ this.subscriberName=subscriberName; } - public synchronized void start() throws Exception{ - if(batchList.isEmpty()){ + public synchronized void start(){ + if(!started){ + started=true; + pendingCount=getStoreSize(); try{ fillBatch(); }catch(Exception e){ log.error("Failed to fill batch",e); throw new RuntimeException(e); } - empty=batchList.isEmpty(); } } - public synchronized void stop() throws Exception{ - store.resetBatching(clientId,subscriberName); - gc(); + public synchronized void stop(){ + if(started){ + started=false; + store.resetBatching(clientId,subscriberName); + gc(); + } } /** - * @return true if there are no pending messages + * @return true if there are no pendingCount messages */ public boolean isEmpty(){ - return empty; + return pendingCount <= 0; } public synchronized int size(){ - try{ - return store.getMessageCount(clientId,subscriberName); - }catch(IOException e){ - log.error(this+" Failed to get the outstanding message count from the store",e); - throw new RuntimeException(e); - } + return getPendingCount(); } public synchronized void addMessageLast(MessageReference node) throws Exception{ if(node!=null){ - if(empty){ + if(isEmpty() && started){ firstMessageId=node.getMessageId(); - empty=false; } lastMessageId=node.getMessageId(); node.decrementReferenceCount(); + pendingCount++; } } + + public void addMessageFirst(MessageReference node) throws Exception{ + if(node!=null){ + if(started){ + firstMessageId=node.getMessageId(); + } + node.decrementReferenceCount(); + pendingCount++; + } + } + + public synchronized void remove(){ + pendingCount--; + } + + public synchronized void remove(MessageReference node){ + pendingCount--; + } + + public synchronized void clear(){ + pendingCount=0; + } public synchronized boolean hasNext(){ return !isEmpty(); @@ -106,7 +127,7 @@ public synchronized MessageReference next(){ Message result=null; - if(!empty){ + if(!isEmpty()){ if(batchList.isEmpty()){ try{ fillBatch(); @@ -120,19 +141,12 @@ } if(!batchList.isEmpty()){ result=batchList.removeFirst(); - if(firstMessageId!=null){ - // Skip messages until we get to the first message. - if(!result.getMessageId().equals(firstMessageId)) - result=null; - firstMessageId=null; - }else{ - if(lastMessageId!=null){ - if(result.getMessageId().equals(lastMessageId)){ - empty=true; - } + if(lastMessageId!=null){ + if(result.getMessageId().equals(lastMessageId)){ + //pendingCount=0; } - result.setRegionDestination(regionDestination); } + result.setRegionDestination(regionDestination); } } return result; @@ -161,8 +175,47 @@ // implementation protected synchronized void fillBatch() throws Exception{ - store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this); + if(!isEmpty()){ + store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this); + if(firstMessageId!=null){ + int pos=0; + for(Message msg:batchList){ + if(msg.getMessageId().equals(firstMessageId)){ + firstMessageId=null; + break; + } + pos++; + } + if(pos>0){ + for(int i=0;i