activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234066 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala RecordLog.scala
Date Fri, 20 Jan 2012 18:52:37 GMT
Author: chirino
Date: Fri Jan 20 18:52:37 2012
New Revision: 1234066

URL: http://svn.apache.org/viewvc?rev=1234066&view=rev
Log:
If the paranoid_checks option is true, double check the integrity of the index and correct
any inconsistencies.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234066&r1=1234065&r2=1234066&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Fri Jan 20 18:52:37 2012
@@ -20,7 +20,6 @@ import java.{lang=>jl}
 import java.{util=>ju}
 
 import org.fusesource.hawtbuf.proto.PBMessageFactory
-import org.apache.activemq.apollo.broker.store.PBSupport._
 
 import org.apache.activemq.apollo.broker.store._
 import java.io._
@@ -39,8 +38,9 @@ import collection.mutable.{HashMap, List
 import org.apache.activemq.apollo.dto.JsonCodec
 import java.util.Map
 import org.iq80.leveldb._
-import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
 import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
+import org.apache.activemq.apollo.broker.store.PBSupport._
+import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -251,10 +251,11 @@ class LevelDBClient(store: LevelDBStore)
 
     index_options = new Options();
     index_options.createIfMissing(true);
+    val paranoid_checks = config.paranoid_checks.getOrElse(false)
 
     config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
     config.index_block_restart_interval.foreach( index_options.blockRestartInterval(_) )
-    index_options.paranoidChecks(config.paranoid_checks.getOrElse(false))
+    index_options.paranoidChecks(paranoid_checks)
     Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.writeBufferSize(_) )
     Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.blockSize(_)
)
     Option(config.index_compression).foreach(x => index_options.compressionType( x match
{
@@ -271,7 +272,7 @@ class LevelDBClient(store: LevelDBStore)
     log = create_log
     log.sync = sync
     log.logSize = log_size
-    log.paranoidChecks = index_options.paranoidChecks()
+    log.verify_checksums = verify_checksums
     log.on_log_rotate = ()=> {
       // lets queue a request to checkpoint when
       // the logs rotate.. queue it on the GC thread since GC's lock
@@ -327,6 +328,11 @@ class LevelDBClient(store: LevelDBStore)
       try {
         load_log_refs
         index.put(dirty_index_key, TRUE)
+
+        if( paranoid_checks ) {
+          check_index_integrity(index)
+        }
+
         // Update the index /w what was stored on the logs..
         var pos = last_index_snapshot_pos;
 
@@ -421,6 +427,85 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
+  def check_index_integrity(index:RichDB) = {
+    val actual_log_refs = HashMap[Long, LongCounter]()
+    var referenced_queues = Set[Long]()
+
+    // Lets find out what the queue entries are..
+    var fixed_records = 0 
+    index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=>
+      try {
+        val (_, queue_key, seq_key) = decode_long_long_key(key)
+        val record: QueueEntryRecord = value
+        if (record.getQueueKey != queue_key) {
+          throw new IOException("key missmatch")
+        }
+        if (record.getQueueSeq != seq_key) {
+          throw new IOException("key missmatch")
+        }
+        val pos = decode_locator(record.getMessageLocator)._1
+        log.log_info(pos).foreach {
+          log_info =>
+            actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+        }
+        referenced_queues += queue_key
+      } catch {
+        case _ =>
+          fixed_records += 1
+          // Invalid record.
+          index.delete(key)
+      }
+      true
+    }
+    
+    // Lets cross check the queues.
+    index.cursor_prefixed(queue_prefix_array) { (key, value)=>
+      try {
+        val (_, queue_key) = decode_long_key(key)
+        val record: QueueRecord = value
+        if (record.getKey != queue_key) {
+          throw new IOException("key missmatch")
+        }
+        referenced_queues -= queue_key
+      } catch {
+        case _ =>
+          fixed_records += 1
+          // Invalid record.
+          index.delete(key)
+      }
+      true
+    }
+
+    referenced_queues.foreach { queue_key=>
+      // We have queue entries for a queue that does not exist..
+      index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key)) { (key, value)=>
+
+        fixed_records += 1
+        index.delete(key)
+        val entry_record:QueueEntryRecord = value
+        val pos = decode_locator(entry_record.getMessageLocator)._1
+        log.log_info(pos).foreach { log_info =>
+          actual_log_refs.get(log_info.position).foreach { counter =>
+            if (counter.decrementAndGet() == 0) {
+              actual_log_refs.remove(log_info.position)
+            }
+          }
+        }
+        true
+      }
+    }
+
+    if( actual_log_refs != log_refs ) {
+      log_refs.clear()
+      log_refs ++= actual_log_refs
+    }
+    
+    if( fixed_records > 0 ) {
+      warn("Fixed %d invalid index enties in the leveldb store", fixed_records)
+    }
+  }
+
+
   private def store_log_refs = {
     index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
   }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1234066&r1=1234065&r2=1234066&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Fri Jan 20 18:52:37 2012
@@ -72,7 +72,7 @@ case class RecordLog(directory: File, lo
 
   var logSize = 1024 * 1024 * 100
   var current_appender:LogAppender = _
-  var paranoidChecks = false
+  var verify_checksums = false
   var sync = false
 
 
@@ -222,7 +222,7 @@ case class RecordLog(directory: File, lo
       val offset = (record_position-position).toInt
       check_read_flush(offset+LOG_HEADER_SIZE+length)
       
-      if(paranoidChecks) {
+      if(verify_checksums) {
 
         val record = new Buffer(LOG_HEADER_SIZE+length)
 
@@ -277,7 +277,7 @@ case class RecordLog(directory: File, lo
         throw new IOException("short record")
       }
 
-      if(paranoidChecks) {
+      if(verify_checksums) {
         if( expectedChecksum != checksum(data) ) {
           throw new IOException("checksum does not match")
         }



Mime
View raw message