activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1213743 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/page/
Date Tue, 13 Dec 2011 15:43:53 GMT
Author: dejanb
Date: Tue Dec 13 15:43:53 2011
New Revision: 1213743

URL: http://svn.apache.org/viewvc?rev=1213743&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3634 - corrupted index recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Tue Dec 13 15:43:53 2011
@@ -16,20 +16,11 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
 import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.command.*;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -40,6 +31,10 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.usage.SystemUsage;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
 /**
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
@@ -500,6 +495,14 @@ public class KahaDBPersistenceAdapter im
         letter.setForceRecoverIndex(forceRecoverIndex);
     }
 
+    public boolean isArchiveCorruptedIndex() {
+        return letter.isArchiveCorruptedIndex();
+    }
+
+    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
+        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
+    }
+
     /**
      * When true, persist the redelivery status such that the message redelivery flag can
survive a broker failure
      * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)
 true

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue Dec 13 15:43:53 2011
@@ -16,38 +16,6 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.Stack;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -55,18 +23,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
-import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
-import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.*;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
@@ -80,20 +37,19 @@ import org.apache.kahadb.journal.Locatio
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LocationMarshaller;
-import org.apache.kahadb.util.LockFile;
-import org.apache.kahadb.util.LongMarshaller;
-import org.apache.kahadb.util.Marshaller;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
-import org.apache.kahadb.util.StringMarshaller;
-import org.apache.kahadb.util.VariableMarshaller;
+import org.apache.kahadb.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.*;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware
{
 
     protected BrokerService brokerService;
@@ -225,6 +181,7 @@ public abstract class MessageDatabase ex
     protected boolean forceRecoverIndex = false;
     private final Object checkpointThreadLock = new Object();
     private boolean rewriteOnRedelivery = false;
+    private boolean archiveCorruptedIndex = false;
 
     public MessageDatabase() {
     }
@@ -333,7 +290,21 @@ public abstract class MessageDatabase ex
     public void open() throws IOException {
         if( opened.compareAndSet(false, true) ) {
             getJournal().start();
-            loadPageFile();
+            try {
+                loadPageFile();
+            } catch (IOException ioe) {
+                LOG.warn("Index corrupted, trying to recover ...", ioe);
+                // try to recover index
+                try {
+                    pageFile.unload();
+                } catch (Exception ignore) {}
+                if (archiveCorruptedIndex) {
+                    pageFile.archive();
+                } else {
+                    pageFile.delete();
+                }
+                loadPageFile();
+            }
             startCheckpoint();
             recover();
         }
@@ -2295,6 +2266,14 @@ public abstract class MessageDatabase ex
         this.rewriteOnRedelivery = rewriteOnRedelivery;
     }
 
+    public boolean isArchiveCorruptedIndex() {
+        return archiveCorruptedIndex;
+    }
+
+    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
+        this.archiveCorruptedIndex = archiveCorruptedIndex;
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     // /////////////////////////////////////////////////////////////////

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
Tue Dec 13 15:43:53 2011
@@ -16,24 +16,15 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-
 import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.RecoveryBrokerTest;
 import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.*;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
 
 
 /**
@@ -53,8 +44,20 @@ public class KahaDBStoreRecoveryBrokerTe
     }
     
     protected BrokerService createRestartedBroker() throws Exception {
+
+        // corrupting index
+        File index = new File("target/activemq-data/kahadb/db.data");
+        index.delete();
+        RandomAccessFile raf = new RandomAccessFile(index, "rw");
+        raf.seek(index.length());
+        raf.writeBytes("corrupt");
+        raf.close();
+
+        // starting broker
         BrokerService broker = new BrokerService();
         KahaDBStore kaha = new KahaDBStore();
+        // uncomment if you want to test archiving
+        //kaha.setArchiveCorruptedIndex(true);
         kaha.setDirectory(new File("target/activemq-data/kahadb"));
         broker.setPersistenceAdapter(kaha);
         return broker;

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Tue Dec 13 15:43:53
2011
@@ -16,16 +16,13 @@
  */
 package org.apache.kahadb.page;
 
+import org.apache.kahadb.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
@@ -34,16 +31,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.IOExceptionSupport;
-import org.apache.kahadb.util.IOHelper;
-import org.apache.kahadb.util.IntrospectionSupport;
-import org.apache.kahadb.util.LRUCache;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
-
 /**
  * A PageFile provides you random access to fixed sized disk pages. This object is not thread
safe and therefore access to it should 
  * be externally synchronized.
@@ -310,6 +297,16 @@ public class PageFile {
         delete(getFreeFile());
         delete(getRecoveryFile());
     }
+    
+    public void archive() throws IOException {
+        if( loaded.get() ) {
+            throw new IllegalStateException("Cannot delete page file data when the page file
is loaded");
+        }
+        long timestamp = System.currentTimeMillis();
+        archive(getMainPageFile(), String.valueOf(timestamp));
+        archive(getFreeFile(), String.valueOf(timestamp));
+        archive(getRecoveryFile(), String.valueOf(timestamp));
+    }
 
     /**
      * @param file
@@ -323,6 +320,15 @@ public class PageFile {
         }
     }
     
+    private void archive(File file, String suffix) throws IOException {
+        if( file.exists() ) {
+            File archive = new File(file.getPath() + "-" + suffix);
+            if( !file.renameTo(archive) ) {
+                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
+            }
+        }
+    }
+    
     /**
      * Loads the page file so that it can be accessed for read/write purposes.  This allocates
OS resources.  If this is the 
      * first time the page file is loaded, then this creates the page file in the file system.



Mime
View raw message