activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1442638 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/ apollo-util/src/main/scala/org/apache/activemq/apo...
Date Tue, 05 Feb 2013 15:59:00 GMT
Author: chirino
Date: Tue Feb  5 15:59:00 2013
New Revision: 1442638

URL: http://svn.apache.org/viewvc?rev=1442638&view=rev
Log:
APLO-293: Improve failure handling in the case of a corrupted leveldb index.

We not attempt an index repair if the store fails to startup right.  And if it still fails
after that the broker completes startup but the virtual host is put into a stopped status.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1442638&r1=1442637&r2=1442638&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Tue Feb  5 15:59:00 2013
@@ -186,7 +186,7 @@ class VirtualHost(val broker: Broker, va
       val task = tracker.task("store startup")
       console_log.info("Starting store: "+store)
       store.start {
-        {
+        if( store.service_failure ==null) {
           val task = tracker.task("store get last queue key")
           store.get_last_queue_key{ key=>
             key match {
@@ -204,29 +204,35 @@ class VirtualHost(val broker: Broker, va
               task.run
             }
           }
+        } else {
+          _service_failure = store.service_failure
+          store = null
         }
         task.run
       }
     }
 
     tracker.callback {
-
       val tracker = new LoggingTracker("virtual host startup", console_log)
+      if( _service_failure==null ) {
 
-      // The default host handles persisting the connection id counter.
-      if(store!=null) {
-        if(session_counter.get == 0) {
-          val task = tracker.task("load session counter")
-          session_counter.init(store) {
-            task.run()
+        // The default host handles persisting the connection id counter.
+        if(store!=null) {
+          if(session_counter.get == 0) {
+            val task = tracker.task("load session counter")
+            session_counter.init(store) {
+              task.run()
+            }
+          } else {
+            session_counter.connect(store)
           }
-        } else {
-          session_counter.connect(store)
         }
+        tracker.start(router)
       }
-
-      tracker.start(router)
       tracker.callback(on_completed)
+      if( _service_failure!=null ) {
+        stop(NOOP)
+      }
     }
 
   }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1442638&r1=1442637&r2=1442638&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
Tue Feb  5 15:59:00 2013
@@ -330,6 +330,7 @@ class LevelDBClient(store: LevelDBStore)
     snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach(_._2.recursive_delete)
     temp_index_file.recursive_delete // usually does not exist.
 
+    var reportedFailure:Throwable = null
     retry {
 
       // Delete the dirty indexes
@@ -351,16 +352,8 @@ class LevelDBClient(store: LevelDBStore)
           }
       }
 
-      try {
+      def recover = {
         index = new RichDB(factory.open(dirty_index_file, index_options))
-      } catch {
-        case e:DBException =>
-          // lets try to recover by repairing the index file.
-          factory.repair(dirty_index_file, index_options)
-          index = new RichDB(factory.open(dirty_index_file, index_options))
-      }
-
-      try {
         load_log_refs
         index.put(dirty_index_key, TRUE)
 
@@ -480,14 +473,32 @@ class LevelDBClient(store: LevelDBStore)
         }
         // delete obsolete files..
         gc
+      }
+
+      try {
+        recover
 
       } catch {
         case e: Throwable =>
-          // replay failed.. good thing we are in a retry block...
-          index.close
-          throw e;
+          if( reportedFailure == null ) {
+            reportedFailure = e
+            // replay failed.. good thing we are in a retry block...
+            if( index!=null) {
+              index.close
+              index = null
+            }
+            // Lets do a repair for shits and giggles..
+            factory.repair(dirty_index_file, index_options)
+            throw e;
+          } else {
+            reportedFailure = e
+          }
       }
     }
+
+    if( reportedFailure!=null ) {
+      throw reportedFailure;
+    }
   }
 
   def check_index_integrity(index: RichDB) = {
@@ -1013,7 +1024,7 @@ class LevelDBClient(store: LevelDBStore)
 
   def list_queues: Seq[Long] = {
     val rc = ListBuffer[Long]()
-    retry_using_index {
+    using_index {
       val ro = new ReadOptions
       ro.verifyChecksums(verify_checksums)
       ro.fillCache(false)

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?rev=1442638&r1=1442637&r2=1442638&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
Tue Feb  5 15:59:00 2013
@@ -117,11 +117,12 @@ class LevelDBStore(val config: LevelDBSt
           client.start()
           next_msg_key.set(client.getLastMessageKey + 1)
           next_queue_key.set(client.get_last_queue_key + 1)
-          on_completed.run
         } catch {
           case e: Throwable =>
-            e.printStackTrace()
-            LevelDBStore.error(e, "Store client startup failure: " + e)
+            _service_failure = e
+            LevelDBStore.error(e, "Store startup failure: " + e)
+        } finally {
+          on_completed.run
         }
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1442638&r1=1442637&r2=1442638&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Tue Feb  5 15:59:00 2013
@@ -65,8 +65,8 @@ trait BaseService extends Service with D
   def service_state = _service_state
 
   @volatile
-  protected var _serviceFailure:Exception = null
-  def serviceFailure = _serviceFailure
+  protected var _service_failure:Throwable = null
+  def service_failure = _service_failure
 
   private val pending_actions = ListBuffer[Task]()
 
@@ -93,7 +93,7 @@ trait BaseService extends Service with D
         catch {
           case e:Exception =>
             error(e, "Start failed due to %s", e)
-            _serviceFailure = e
+            _service_failure = e
             _service_state = new FAILED
             done
         }
@@ -142,7 +142,7 @@ trait BaseService extends Service with D
           catch {
             case e:Exception =>
               error(e, "Stop failed due to: %s", e)
-              _serviceFailure = e
+              _service_failure = e
               _service_state = new FAILED
               done
           }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java?rev=1442638&r1=1442637&r2=1442638&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java
Tue Feb  5 15:59:00 2013
@@ -40,4 +40,9 @@ public interface Service {
      */
     void stop(Task onComplete) throws Exception;
 
+    /**
+     * @return the error that caused the service to not start.
+     */
+    public Throwable service_failure();
+
 }



Mime
View raw message