Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 86525 invoked from network); 16 Jun 2007 11:28:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Jun 2007 11:28:11 -0000 Received: (qmail 82390 invoked by uid 500); 16 Jun 2007 11:28:14 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 82361 invoked by uid 500); 16 Jun 2007 11:28:14 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 82352 invoked by uid 99); 16 Jun 2007 11:28:14 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Jun 2007 04:28:14 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Jun 2007 04:28:09 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id BA4581A981A; Sat, 16 Jun 2007 04:27:49 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r547906 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor: KahaMessageStore.java KahaReferenceStore.java KahaTopicMessageStore.java KahaTopicReferenceStore.java TopicSubContainer.java Date: Sat, 16 Jun 2007 11:27:49 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070616112749.BA4581A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Sat Jun 16 04:27:48 2007 New Revision: 547906 URL: http://svn.apache.org/viewvc?view=rev&rev=547906 Log: check we don't keep hold of a batch entry after its been deleted Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=547906&r1=547905&r2=547906 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Sat Jun 16 04:27:48 2007 @@ -66,8 +66,8 @@ return result; } - protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ - listener.recoverMessage((Message)msg); + protected void recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{ + listener.recoverMessage(msg); } public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ @@ -89,7 +89,7 @@ public synchronized void recover(MessageRecoveryListener listener) throws Exception{ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ Message msg=(Message)messageContainer.getValue(entry); - recover(listener,msg); + recoverMessage(listener,msg); } listener.finished(); } @@ -158,9 +158,9 @@ if(entry!=null){ int count=0; do{ - Object msg=messageContainer.getValue(entry); + Message msg=messageContainer.getValue(entry); if(msg!=null){ - recover(listener,msg); + recoverMessage(listener,msg); count++; } batchEntry=entry; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=547906&r1=547905&r2=547906 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Sat Jun 16 04:27:48 2007 @@ -15,7 +15,6 @@ package org.apache.activemq.store.kahadaptor; import java.io.IOException; -import java.util.Set; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -33,6 +32,7 @@ protected final MapContainer messageContainer; protected KahaReferenceStoreAdapter adapter; private StoreEntry batchEntry=null; + private String lastBatchId=null; public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{ this.adapter = adapter; @@ -58,15 +58,14 @@ throw new RuntimeException("Use addMessageReference instead"); } - protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ - ReferenceRecord record=(ReferenceRecord)msg; + protected final void recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{ listener.recoverMessageReference(new MessageId(record.getMessageId())); } public synchronized void recover(MessageRecoveryListener listener) throws Exception{ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ ReferenceRecord record=messageContainer.getValue(entry); - recover(listener,new MessageId(record.getMessageId())); + recoverReference(listener,record); } listener.finished(); } @@ -77,17 +76,20 @@ entry=messageContainer.getFirst(); }else{ entry=messageContainer.refresh(entry); - if (entry != null) { - entry=messageContainer.getNext(entry); + if(entry!=null){ + entry=messageContainer.getNext(entry); } } if(entry!=null){ int count=0; do{ - Object msg=messageContainer.getValue(entry); + ReferenceRecord msg=messageContainer.getValue(entry); if(msg!=null){ - recover(listener,msg); + recoverReference(listener,msg); count++; + lastBatchId=msg.getMessageId(); + }else{ + lastBatchId=null; } batchEntry=entry; entry=messageContainer.getNext(entry); @@ -96,14 +98,14 @@ listener.finished(); } - public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data) + public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data) throws IOException{ ReferenceRecord record=new ReferenceRecord(messageId.toString(),data); messageContainer.put(messageId,record); addInterest(record); } - public ReferenceData getMessageReference(MessageId identity) throws IOException{ + public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException{ ReferenceRecord result=messageContainer.get(identity); if(result==null) return null; @@ -127,7 +129,8 @@ ReferenceRecord rr=messageContainer.remove(msgId); if(rr!=null){ removeInterest(rr); - if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){ + if(messageContainer.isEmpty()||(lastBatchId!=null&&lastBatchId.equals(msgId.toString())) + ||(batchEntry!=null&&batchEntry.equals(entry))){ resetBatching(); } } @@ -148,6 +151,7 @@ public void resetBatching(){ batchEntry=null; + lastBatchId=null; } public int getMessageCount(){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=547906&r1=547905&r2=547906 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Sat Jun 16 04:27:48 2007 @@ -146,9 +146,9 @@ if(container!=null){ for(Iterator i=container.iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); - Object msg=messageContainer.get(ref.getMessageEntry()); + Message msg=messageContainer.get(ref.getMessageEntry()); if(msg!=null){ - recover(listener, msg); + recoverMessage(listener, msg); } } } @@ -173,12 +173,15 @@ if(entry!=null){ do{ ConsumerMessageRef consumerRef=container.get(entry); - Object msg=messageContainer.getValue(consumerRef.getMessageEntry()); + Message msg=messageContainer.getValue(consumerRef.getMessageEntry()); if(msg!=null){ - recover(listener, msg); + recoverMessage(listener, msg); count++; + container.setBatchEntry(msg.getMessageId().toString(),entry); + }else { + container.reset(); } - container.setBatchEntry(entry); + entry=container.getNextEntry(entry); }while(entry!=null&&count