From commits-return-10146-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Feb 06 18:20:02 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 11667 invoked from network); 6 Feb 2009 18:20:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Feb 2009 18:20:02 -0000 Received: (qmail 3084 invoked by uid 500); 6 Feb 2009 18:20:02 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 3032 invoked by uid 500); 6 Feb 2009 18:20:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 3023 invoked by uid 99); 6 Feb 2009 18:20:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Feb 2009 10:20:02 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Feb 2009 18:19:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4FC9A2388896; Fri, 6 Feb 2009 18:19:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r741659 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Date: Fri, 06 Feb 2009 18:19:32 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090206181932.4FC9A2388896@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 6 18:19:31 2009 New Revision: 741659 URL: http://svn.apache.org/viewvc?rev=741659&view=rev Log: - added some handy generic visitors to the BTreeVisitor class. - Updated the recovery process so it now rollsback changes applied to the index which did not get synced to the journal. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741659&r1=741658&r2=741659&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Feb 6 18:19:31 2009 @@ -360,25 +360,64 @@ long start = System.currentTimeMillis(); Location recoveryPosition = getRecoveryPosition(); - if( recoveryPosition ==null ) { - return; + if( recoveryPosition!=null ) { + int redoCounter = 0; + while (recoveryPosition != null) { + JournalCommand message = load(recoveryPosition); + metadata.lastUpdate = recoveryPosition; + process(message, recoveryPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + } + long end = System.currentTimeMillis(); + LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); } - - int redoCounter = 0; - LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset()); - - while (recoveryPosition != null) { - JournalCommand message = load(recoveryPosition); - metadata.lastUpdate = recoveryPosition; - process(message, recoveryPosition); - redoCounter++; - recoveryPosition = journal.getNextLocation(recoveryPosition); - } - long end = System.currentTimeMillis(); - LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); + + // We may have to undo some index updates. + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + recoverIndex(tx); + } + }); } } + protected void recoverIndex(Transaction tx) throws IOException { + long start = System.currentTimeMillis(); + // It is possible index updates got applied before the journal updates.. + // in that case we need to removed references to messages that are not in the journal + final Location lastAppendLocation = journal.getLastAppendLocation(); + long undoCounter=0; + + // Go through all the destinations to see if they have messages past the lastAppendLocation + for (StoredDestination sd : storedDestinations.values()) { + + final ArrayList matches = new ArrayList(); + // Find all the Locations that are >= than the last Append Location. + sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor(lastAppendLocation) { + @Override + protected void matched(Location key, Long value) { + matches.add(value); + } + }); + + + for (Long sequenceId : matches) { + MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); + sd.locationIndex.remove(tx, keys.location); + sd.messageIdIndex.remove(tx, keys.messageId); + undoCounter++; + // TODO: do we need to modify the ack positions for the pub sub case? + } + } + long end = System.currentTimeMillis(); + if( undoCounter > 0 ) { + // The rolledback operations are basically in flight journal writes. To avoid getting these the end user + // should do sync writes to the journal. + LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds."); + } + } + private Location nextRecoveryPosition; private Location lastRecoveryPosition; Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=741659&r1=741658&r2=741659&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Fri Feb 6 18:19:31 2009 @@ -43,4 +43,96 @@ */ void visit(List keys, List values); + + abstract class GTVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public GTVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return second==null || second.compareTo(value)>0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)>0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + + abstract class GTEVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public GTEVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return second==null || second.compareTo(value)>=0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)>=0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + + abstract class LTVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public LTVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return first==null || first.compareTo(value)<0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)<0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + + abstract class LTEVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public LTEVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return first==null || first.compareTo(value)<=0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)<=0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } } \ No newline at end of file