activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r799733 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/main/resources/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/index/ kahadb...
Date Fri, 31 Jul 2009 20:10:05 GMT
Author: chirino
Date: Fri Jul 31 20:10:05 2009
New Revision: 799733

URL: http://svn.apache.org/viewvc?rev=799733&view=rev
Log:
Implemented:
 https://issues.apache.org/activemq/browse/AMQ-2338
 https://issues.apache.org/activemq/browse/AMQ-2337


Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/resources/activemq.xsd
    activemq/trunk/activemq-core/src/main/resources/activemq.xsd.html
    activemq/trunk/activemq-core/src/main/resources/activemq.xsd.wiki
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=799733&r1=799732&r2=799733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Jul 31 20:10:05 2009
@@ -63,14 +63,7 @@
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LockFile;
-import org.apache.kahadb.util.LongMarshaller;
-import org.apache.kahadb.util.Marshaller;
-import org.apache.kahadb.util.StringMarshaller;
-import org.apache.kahadb.util.VariableMarshaller;
+import org.apache.kahadb.util.*;
 
 public class MessageDatabase {
 
@@ -155,6 +148,8 @@
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
+    private boolean ignoreMissingJournalfiles = false;
+    private int indexCacheSize = 100;
 
     public MessageDatabase() {
     }
@@ -218,24 +213,6 @@
 	 * @throws IOException
 	 */
 	public void open() throws IOException {
-		File lockFileName = new File(directory, "lock");
-		lockFile = new LockFile(lockFileName, true);
-		if (failIfDatabaseIsLocked) {
-		    lockFile.lock();
-		} else {
-		    while (true) {
-		        try {
-		            lockFile.lock();
-		            break;
-		        } catch (IOException e) {
-		            LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY
/ 1000) + " seconds for the database to be unlocked. Reason: " + e);
-		            try {
-		                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
-		            } catch (InterruptedException e1) {
-		            }
-		        }
-		    }
-		}
 		if( opened.compareAndSet(false, true) ) {
             getJournal().start();
             
@@ -271,24 +248,45 @@
             recover();
 		}
 	}
-	
+
+    private void lock() throws IOException {
+        if( lockFile == null ) {
+            File lockFileName = new File(directory, "lock");
+            lockFile = new LockFile(lockFileName, true);
+            if (failIfDatabaseIsLocked) {
+                lockFile.lock();
+            } else {
+                while (true) {
+                    try {
+                        lockFile.lock();
+                        break;
+                    } catch (IOException e) {
+                        LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY
/ 1000) + " seconds for the database to be unlocked. Reason: " + e);
+                        try {
+                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
+                        } catch (InterruptedException e1) {
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     public void load() throws IOException {
     	
         synchronized (indexMutex) {
+            lock();
+            if (deleteAllMessages) {
+                getJournal().start();
+                getJournal().delete();
+                getJournal().close();
+                journal = null;
+                getPageFile().delete();
+                LOG.info("Persistence store purged.");
+                deleteAllMessages = false;
+            }
+
 	    	open();
-	    	
-	        if (deleteAllMessages) {
-	            journal.delete();
-	
-	            pageFile.unload();
-	            pageFile.delete();
-	            metadata = new Metadata();
-	            
-	            LOG.info("Persistence store purged.");
-	            deleteAllMessages = false;
-	            
-	            loadPageFile();
-	        }
 	        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
 
         }
@@ -348,7 +346,6 @@
      * 
      * @throws IOException
      * @throws IOException
-     * @throws InvalidLocationException
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
@@ -406,6 +403,75 @@
                 // TODO: do we need to modify the ack positions for the pub sub case?
 			}
         }
+
+
+        // Lets be extra paranoid here and verify that all the datafiles being referenced
+        // by the indexes still exists.
+
+        final SequenceSet ss = new SequenceSet();
+        for (StoredDestination sd : storedDestinations.values()) {
+            // Use a visitor to cut down the number of pages that we load
+            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+                int last=-1;
+
+                public boolean isInterestedInKeysBetween(Location first, Location second)
{
+                    if( first==null ) {
+                        return !ss.contains(0, second.getDataFileId());
+                    } else if( second==null ) {
+                        return true;
+                    } else {
+                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
+                    }
+                }
+
+                public void visit(List<Location> keys, List<Long> values) {
+                    for (Location l : keys) {
+                        int fileId = l.getDataFileId();
+                        if( last != fileId ) {
+                            ss.add(fileId);
+                            last = fileId;
+                        }
+                    }
+                }
+
+            });
+        }
+        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
+        while( !ss.isEmpty() ) {
+            missingJournalFiles.add( (int)ss.removeFirst() );
+        }
+        missingJournalFiles.removeAll( journal.getFileMap().keySet() );
+
+        if( !missingJournalFiles.isEmpty() ) {
+            if( ignoreMissingJournalfiles ) {
+
+                for (StoredDestination sd : storedDestinations.values()) {
+
+                    final ArrayList<Long> matches = new ArrayList<Long>();
+                    for (Integer missing : missingJournalFiles) {
+                        sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor<Location,
Long>(new Location(missing,0), new Location(missing+1,0)) {
+                            @Override
+                            protected void matched(Location key, Long value) {
+                                matches.add(value);
+                            }
+                        });
+                    }
+
+
+                    for (Long sequenceId : matches) {
+                        MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+                        sd.locationIndex.remove(tx, keys.location);
+                        sd.messageIdIndex.remove(tx, keys.messageId);
+                        undoCounter++;
+                        // TODO: do we need to modify the ack positions for the pub sub case?
+                    }
+                }
+                
+            } else {
+                throw new IOException("Detected missing journal files: "+missingJournalFiles);
+            }
+        }
+
         long end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
         	// The rolledback operations are basically in flight journal writes.  To avoid getting
these the end user
@@ -1263,6 +1329,7 @@
         PageFile index = new PageFile(directory, "db");
         index.setEnableWriteThread(isEnableIndexWriteAsync());
         index.setWriteBatchSize(getIndexWriteBatchSize());
+        index.setPageCacheSize(indexCacheSize);
         return index;
     }
 
@@ -1358,4 +1425,20 @@
     public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
         this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
     }
+
+    public boolean isIgnoreMissingJournalfiles() {
+        return ignoreMissingJournalfiles;
+    }
+    
+    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
+        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
+    }
+
+    public int getIndexCacheSize() {
+        return indexCacheSize;
+    }
+
+    public void setIndexCacheSize(int indexCacheSize) {
+        this.indexCacheSize = indexCacheSize;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd?rev=799733&r1=799732&r2=799733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd Fri Jul 31 20:10:05 2009
@@ -2883,6 +2883,8 @@
       <xs:attribute name='journalMaxFileLength' type='xs:long'/>
       <xs:attribute name='enableIndexWriteAsync' type='xs:boolean'/>
       <xs:attribute name='enableJournalDiskSyncs' type='xs:boolean'/>
+      <xs:attribute name='ignoreMissingJournalfiles' type='xs:boolean'/>
+      <xs:attribute name='indexCacheSize' type='xs:integer'/>
       <xs:attribute name='size' type='xs:string'/>
       <xs:attribute name='usageManager' type='xs:string'/>
       <xs:attribute name='id' type='xs:ID'/>

Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd.html?rev=799733&r1=799732&r2=799733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd.html (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd.html Fri Jul 31 20:10:05
2009
@@ -976,6 +976,8 @@
   <tr><td>indexWriteBatchSize</td><td>xs:integer</td><td></td></tr>
   <tr><td>enableIndexWriteAsync</td><td>xs:boolean</td><td></td></tr>
   <tr><td>enableJournalDiskSyncs</td><td>xs:boolean</td><td></td></tr>
+  <tr><td>ignoreMissingJournalfiles</td><td>xs:boolean</td><td></td></tr>
+  <tr><td>indexCacheSize</td><td>xs:integer</td><td></td></tr>
 </table>
 <table>
   <tr><th>Element</th><th>Type</th><th>Description</th>

Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd.wiki
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd.wiki?rev=799733&r1=799732&r2=799733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd.wiki (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd.wiki Fri Jul 31 20:10:05
2009
@@ -1247,6 +1247,8 @@
     | indexWriteBatchSize | _int_ | {html}{html} |
     | enableIndexWriteAsync | _boolean_ | {html}{html} |
     | enableJournalDiskSyncs | _boolean_ | {html}{html} |
+    | ignoreMissingJournalfiles | _boolean_ | {html}{html} |
+    | indexCacheSize | _int_ | {html}{html} |
     | size | _java.util.concurrent.atomic.AtomicLong_ | {html}{html} |
     | usageManager | _[org.apache.activemq.usage.SystemUsage|#org.apache.activemq.usage.SystemUsage-types]_
| {html}{html} |
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java?rev=799733&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
Fri Jul 31 20:10:05 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * @author chirino
+ */
+public class KahaDBTest extends TestCase {
+
+    protected BrokerService createBroker(KahaDBStore kaha) throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistenceAdapter(kaha);
+        broker.start();
+        return broker;
+
+    }
+
+    private KahaDBStore createStore(boolean delete) throws IOException {
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        if( delete ) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    public void testIgnoreMissingJournalfilesOptionSetFalse() throws Exception {
+        KahaDBStore kaha = createStore(true);
+        kaha.setJournalMaxFileLength(1024*100);
+        assertFalse(kaha.isIgnoreMissingJournalfiles());
+        BrokerService broker = createBroker(kaha);
+        sendMessages(1000);
+        broker.stop();
+
+        // Delete some journal files..
+        assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log"));
+        assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log"));
+
+        kaha = createStore(false);
+        kaha.setJournalMaxFileLength(1024*100);
+        assertFalse(kaha.isIgnoreMissingJournalfiles());
+        try {
+            broker = createBroker(kaha);
+            fail("expected IOException");
+        } catch (IOException e) {
+            assertTrue( e.getMessage().startsWith("Detected missing journal files") );
+        }
+
+    }
+
+
+    public void testIgnoreMissingJournalfilesOptionSetTrue() throws Exception {
+        KahaDBStore kaha = createStore(true);
+        kaha.setJournalMaxFileLength(1024*100);
+        assertFalse(kaha.isIgnoreMissingJournalfiles());
+        BrokerService broker = createBroker(kaha);
+        sendMessages(1000);
+        broker.stop();
+
+        // Delete some journal files..
+        assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log"));
+        assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log"));
+
+        kaha = createStore(false);
+        kaha.setIgnoreMissingJournalfiles(true);
+        kaha.setJournalMaxFileLength(1024*100);
+        broker = createBroker(kaha);
+
+        // We know we won't get all the messages but we should get most of them.
+        int count = receiveMessages();
+        assertTrue( count > 800 ); 
+        assertTrue( count < 1000 );
+
+        broker.stop();
+    }
+
+    private void assertExistsAndDelete(File file) {
+        assertTrue(file.exists());
+        file.delete();
+        assertFalse(file.exists());
+    }
+
+    private void sendMessages(int count) throws JMSException {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+            for (int i = 0; i < count; i++) {
+                producer.send(session.createTextMessage(createContent(i)));
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
+    private int receiveMessages() throws JMSException {
+        int rc=0;
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST"));
+            while ( messageConsumer.receive(1000) !=null ) {
+                rc++;
+            }
+            return rc;
+        } finally {
+            connection.close();
+        }
+    }
+
+    private String createContent(int i) {
+        StringBuilder sb = new StringBuilder(i+":");
+        while( sb.length() < 1024 ) {
+            sb.append("*");
+        }
+        return sb.toString();
+    }
+
+}
\ No newline at end of file

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=799733&r1=799732&r2=799733&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Fri Jul
31 20:10:05 2009
@@ -67,6 +67,32 @@
 		abstract protected void matched(Key key, Value value);
     }
     
+    abstract class BetweenVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+		private final Key first;
+        private final Key last;
+
+        public BetweenVisitor(Key first, Key last) {
+			this.first = first;
+            this.last = last;
+        }
+
+		public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return (second==null || second.compareTo(this.first)>=0)
+                   && (first==null || first.compareTo(last)<0);
+		}
+
+		public void visit(List<Key> keys, List<Value> values) {
+			for( int i=0; i < keys.size(); i++) {
+				Key key = keys.get(i);
+				if( key.compareTo(first)>=0 && key.compareTo(last)<0 ) {
+					matched(key, values.get(i));
+				}
+			}
+		}
+
+		abstract protected void matched(Key key, Value value);
+    }
+
     abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
 		final private Key value;
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=799733&r1=799732&r2=799733&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Fri Jul 31
20:10:05 2009
@@ -31,7 +31,7 @@
  * @author chirino
  */
 public class SequenceSet extends LinkedNodeList<Sequence> {
-    
+
     public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet>
{
 
         public static final Marshaller INSTANCE = new Marshaller();
@@ -254,5 +254,19 @@
         }
         return rc;
     }
-   
+
+    public boolean contains(int first, int last) {
+        if (isEmpty()) {
+            return false;
+        }
+        Sequence sequence = getHead();
+        while (sequence != null) {
+            if (sequence.first <= first ) {
+                return last <= sequence.last ;
+            }
+            sequence = sequence.getNext();
+        }
+        return false;
+    }
+
 }
\ No newline at end of file



Mime
View raw message