activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1236661 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/kahadb/
Date Fri, 27 Jan 2012 12:47:39 GMT
Author: gtully
Date: Fri Jan 27 12:47:39 2012
New Revision: 1236661

URL: http://svn.apache.org/viewvc?rev=1236661&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3634 - ensure full recovery of the index, irrespective
of the load failure reason, with additional tests. Shutdown the schedualler early to ensure
no ugly errors from timer tasks during shutdown

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1236661&r1=1236660&r2=1236661&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Jan 27 12:47:39 2012
@@ -588,6 +588,10 @@ public class BrokerService implements Se
 
         LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is
shutting down");
         removeShutdownHook();
+        if (this.scheduler != null) {
+            this.scheduler.stop();
+            this.scheduler = null;
+        }
         ServiceStopper stopper = new ServiceStopper();
         if (services != null) {
             for (Service service : services) {
@@ -645,10 +649,6 @@ public class BrokerService implements Se
             this.taskRunnerFactory.shutdown();
             this.taskRunnerFactory = null;
         }
-        if (this.scheduler != null) {
-            this.scheduler.stop();
-            this.scheduler = null;
-        }
         if (this.executor != null) {
             this.executor.shutdownNow();
             this.executor = null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1236661&r1=1236660&r2=1236661&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Jan 27 12:47:39 2012
@@ -300,8 +300,11 @@ public abstract class MessageDatabase ex
             getJournal().start();
             try {
                 loadPageFile();
-            } catch (IOException ioe) {
-                LOG.warn("Index corrupted, trying to recover ...", ioe);
+            } catch (Throwable t) {
+                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:"
+ t);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Index load failure", t);
+                }
                 // try to recover index
                 try {
                     pageFile.unload();
@@ -311,6 +314,8 @@ public abstract class MessageDatabase ex
                 } else {
                     pageFile.delete();
                 }
+                metadata = new Metadata();
+                pageFile = null;
                 loadPageFile();
             }
             startCheckpoint();
@@ -383,11 +388,13 @@ public abstract class MessageDatabase ex
             try {
                 this.indexLock.writeLock().lock();
                 try {
-                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                        public void execute(Transaction tx) throws IOException {
-                            checkpointUpdate(tx, true);
-                        }
-                    });
+                    if (metadata.page != null) {
+                        pageFile.tx().execute(new Transaction.Closure<IOException>()
{
+                            public void execute(Transaction tx) throws IOException {
+                                checkpointUpdate(tx, true);
+                            }
+                        });
+                    }
                     pageFile.unload();
                     metadata = new Metadata();
                 } finally {
@@ -413,11 +420,13 @@ public abstract class MessageDatabase ex
                 metadata.state = CLOSED_STATE;
                 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
 
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        tx.store(metadata.page, metadataMarshaller, true);
-                    }
-                });
+                if (metadata.page != null) {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
+                            tx.store(metadata.page, metadataMarshaller, true);
+                        }
+                    });
+                }
             }
         } finally {
             this.indexLock.writeLock().unlock();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java?rev=1236661&r1=1236660&r2=1236661&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
Fri Jan 27 12:47:39 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.RecoveryBrokerTest;
 import org.apache.activemq.broker.StubConnection;
 import org.apache.activemq.command.*;
+import org.apache.kahadb.page.PageFile;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -34,6 +35,9 @@ import java.util.ArrayList;
  */
 public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
 
+    enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt };
+    public CorruptionType  failTest = CorruptionType.None;
+
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
         KahaDBStore kaha = new KahaDBStore();
@@ -47,10 +51,27 @@ public class KahaDBStoreRecoveryBrokerTe
 
         // corrupting index
         File index = new File("target/activemq-data/kahadb/db.data");
-        index.delete();
         RandomAccessFile raf = new RandomAccessFile(index, "rw");
-        raf.seek(index.length());
-        raf.writeBytes("corrupt");
+        switch (failTest) {
+            case FailToLoad:
+                index.delete();
+                raf = new RandomAccessFile(index, "rw");
+                raf.seek(index.length());
+                raf.writeBytes("corrupt");
+                break;
+            case LoadInvalid:
+                // page size 0
+                raf.seek(0);
+                raf.writeBytes("corrupt and cannot load metadata");
+                break;
+            case LoadCorrupt:
+                // loadable but invalid metadata
+                // location of order index low priority index for first destination...
+                raf.seek(8*1024 + 57);
+                raf.writeLong(Integer.MAX_VALUE-10);
+                break;
+            default:
+        }
         raf.close();
 
         // starting broker
@@ -71,7 +92,10 @@ public class KahaDBStoreRecoveryBrokerTe
         junit.textui.TestRunner.run(suite());
     }
 
-    
+    public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() {
+        this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad,
CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt} );
+    }
+
     public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
 
         ActiveMQDestination destination = new ActiveMQQueue("TEST");



Mime
View raw message