activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1480600 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb: LevelDBClient.scala RecordLog.scala
Date Thu, 09 May 2013 12:23:11 GMT
Author: chirino
Date: Thu May  9 12:23:11 2013
New Revision: 1480600

URL: http://svn.apache.org/r1480600
Log:
Fixes APLO-320: Occasionally on restart 'Invalid log position:' warning messages get logged

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1480600&r1=1480599&r2=1480600&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
Thu May  9 12:23:11 2013
@@ -59,6 +59,7 @@ object LevelDBClient extends Log {
 
   final val dirty_index_key = bytes(":dirty")
   final val log_refs_index_key = bytes(":log-refs")
+  final val logs_index_key = bytes(":logs")
   final val TRUE = bytes("true")
   final val FALSE = bytes("false")
 
@@ -210,6 +211,7 @@ class LevelDBClient(store: LevelDBStore)
 
   var factory: DBFactory = _
   val log_refs = HashMap[Long, LongCounter]()
+  var recovery_logs:java.util.TreeMap[Long, Void] = _
 
   def dirty_index_file = directory / ("dirty" + INDEX_SUFFIX)
 
@@ -341,26 +343,23 @@ class LevelDBClient(store: LevelDBStore)
       dirty_index_file.recursive_delete
       dirty_index_file.mkdirs()
 
-      last_snapshot_index.foreach {
-        case (id, file) =>
-          // Resume log replay from a snapshot of the index..
-          try {
-            file.list_files.foreach {
-              file =>
-                link(file, dirty_index_file / file.getName)
-            }
-          } catch {
-            case e: Exception =>
-              warn(e, "Could not recover snapshot of the index: " + e)
-              last_snapshot_index = None
-          }
+      for( (id, file) <- last_snapshot_index) {
+        copy_index(file, dirty_index_file)
       }
 
       def recover = {
         index = new RichDB(factory.open(dirty_index_file, index_options))
-        load_log_refs
+        if (paranoid_checks) {
+          for(value <- index.get(dirty_index_key) ) {
+            if( java.util.Arrays.equals(value, TRUE) ) {
+              warn("Recovering from a dirty index.")
+            }
+          }
+        }
         index.put(dirty_index_key, TRUE)
 
+        load_log_refs
+
         if (paranoid_checks) {
           check_index_integrity(index)
         }
@@ -497,6 +496,8 @@ class LevelDBClient(store: LevelDBStore)
           } else {
             reportedFailure = e
           }
+      } finally {
+        recovery_logs = null
       }
     }
 
@@ -516,8 +517,8 @@ class LevelDBClient(store: LevelDBStore)
         val (_, queue_key, seq_key) = decode_long_long_key(key)
         val record = QueueEntryPB.FACTORY.parseUnframed(value)
         val (pos, len) = decode_locator(record.getMessageLocator)
-        log.log_info(pos).foreach { log_info =>
-          actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+        for( key <- log_ref_key(pos) ) {
+          actual_log_refs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
         }
         referenced_queues += queue_key
       } catch {
@@ -560,14 +561,12 @@ class LevelDBClient(store: LevelDBStore)
             index.delete(key)
             val record = QueueEntryPB.FACTORY.parseUnframed(value)
             val pos = decode_vlong(record.getMessageLocator)
-            log.log_info(pos).foreach {
-              log_info =>
-                actual_log_refs.get(log_info.position).foreach {
-                  counter =>
-                    if (counter.decrementAndGet() == 0) {
-                      actual_log_refs.remove(log_info.position)
-                    }
+            for( key <- log_ref_key(pos)) {
+              for( counter <- actual_log_refs.get(key) ) {
+                if (counter.decrementAndGet() == 0) {
+                  actual_log_refs.remove(key)
                 }
+              }
             }
             true
         }
@@ -602,19 +601,27 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   private def store_log_refs = {
-    index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
+    import collection.JavaConversions.mapAsJavaMap
+    index.put(log_refs_index_key, JsonCodec.encode(mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
+    index.put(logs_index_key, JsonCodec.encode(log.log_file_positions).toByteArray)
   }
 
   private def load_log_refs = {
+    import collection.JavaConversions._
     log_refs.clear()
     index.get(log_refs_index_key, new ReadOptions).foreach {
       value =>
-        val javamap = JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String, Object]])
-        collection.JavaConversions.mapAsScalaMap(javamap).foreach {
-          case (k, v) =>
+        for( (k, v) <- JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String,
Object]]) ) {
             log_refs.put(k.toLong, new LongCounter(v.asInstanceOf[Number].longValue()))
         }
     }
+
+    index.get(logs_index_key, new ReadOptions).map { value =>
+      recovery_logs = new java.util.TreeMap[Long, Void]()
+      for( v <- JsonCodec.decode(new Buffer(value), classOf[java.util.List[Object]]) )
{
+        recovery_logs.put(v.asInstanceOf[Number].longValue(), null)
+      }
+    }
   }
 
   def stop() = {
@@ -650,9 +657,9 @@ class LevelDBClient(store: LevelDBStore)
     // we will be closing it to create a consistent snapshot.
     snapshot_rw_lock.writeLock().lock()
 
-    // Close the index so that it's files are not changed async on us.
     store_log_refs
     index.put(dirty_index_key, FALSE, new WriteOptions().sync(true))
+    // Suspend the index so that it's files are not changed async on us.
     index.db.suspendCompactions
   }
 
@@ -669,6 +676,19 @@ class LevelDBClient(store: LevelDBStore)
     snapshot_rw_lock.writeLock().unlock()
   }
 
+  def copy_index(from:File, to:File) = {
+    for( file <- from.list_files ) {
+      val name: String = file.getName
+      if( name.endsWith(".sst") ) {
+        // SST files don't change once created, safe to hard link.
+        link(file, to / name)
+      } else {
+        /// These might not be append only files, so avoid hard linking just to be safe.
+        copyLinkStrategy(file, to / name)
+      }
+    }
+  }
+
   def copy_dirty_index_to_snapshot {
     if (log.appender_limit == last_index_snapshot_pos) {
       // no need to snapshot again...
@@ -683,30 +703,12 @@ class LevelDBClient(store: LevelDBStore)
     try {
 
       // Copy/Hard link all the index files.
-      for( file <- dirty_index_file.list_files ) {
-        val name: String = file.getName
-        if( name == "CURRENT" || name.startsWith("MANIFEST-") ) {
-          /// These might not be append only files, so avoid hard linking just to be safe.
-          copyLinkStrategy(file, tmp_dir / name)
-        } else {
-          // These are append only files, safe to hard line.
-          link(file, tmp_dir / name)
-        }
-      }
+      copy_index(dirty_index_file, tmp_dir)
 
       // Rename to signal that the snapshot is complete.
       val new_snapshot_index_pos = log.appender_limit
       tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
 
-      if (paranoid_checks) {
-        val tmp = new RichDB(factory.open(snapshot_index_file(new_snapshot_index_pos), index_options))
-        try {
-          check_index_integrity(tmp)
-        } finally {
-          tmp.close
-        }
-      }
-
       snapshot_index_file(last_index_snapshot_pos).recursive_delete
       last_index_snapshot_pos = new_snapshot_index_pos
       last_index_snapshot_ts = System.currentTimeMillis()
@@ -725,6 +727,9 @@ class LevelDBClient(store: LevelDBStore)
       // no need to snapshot again...
       return
     }
+//    if (paranoid_checks) {
+//      check_index_integrity(index)
+//    }
     suspend()
     try {
       copy_dirty_index_to_snapshot
@@ -795,26 +800,35 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def log_ref_decrement(pos: Long, log_info: LogInfo = null) = this.synchronized {
-    Option(log_info).orElse(log.log_info(pos)) match {
-      case Some(log_info) =>
-        log_refs.get(log_info.position).foreach {
-          counter =>
-            val count = counter.decrementAndGet()
-            if (count == 0) {
-              log_refs.remove(log_info.position)
-            }
+    for( key <- log_ref_key(pos, log_info) ) {
+      for( counter<- log_refs.get(key) ) {
+        if (counter.decrementAndGet() == 0) {
+          log_refs.remove(key)
         }
-      case None =>
-        warn("Invalid log position: " + pos)
+      }
     }
   }
 
   def log_ref_increment(pos: Long, log_info: LogInfo = null) = this.synchronized {
-    Option(log_info).orElse(log.log_info(pos)) match {
-      case Some(log_info) =>
-        val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
-      case None =>
+    for( key <- log_ref_key(pos, log_info) ) {
+      log_refs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
+    }
+  }
+
+
+  def log_ref_key(pos: Long, log_info: RecordLog.LogInfo=null): Option[Long] = {
+    if( log_info!=null ) {
+      Some(log_info.position)
+    } else {
+      val rc = if( recovery_logs !=null ) {
+        Option(recovery_logs.floorKey(pos))
+      } else {
+        log.log_info(pos).map(_.position)
+      }
+      if( !rc.isDefined ) {
         warn("Invalid log position: " + pos)
+      }
+      rc
     }
   }
 
@@ -1221,7 +1235,7 @@ class LevelDBClient(store: LevelDBStore)
 
     // We don't want to delete any journals that the index has not snapshot'ed or
     // the the
-    val delete_limit = log.log_info(last_index_snapshot_pos).map(_.position).
+    val delete_limit = log_ref_key(last_index_snapshot_pos).
       getOrElse(last_index_snapshot_pos).min(log.appender_start)
 
     empty_journals.foreach {
@@ -1450,9 +1464,8 @@ class LevelDBClient(store: LevelDBStore)
                     val (pos, len) = decode_locator(locator)
                     copy.setMessageLocator(locator)
                     index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq),
copy.freeze().toUnframedBuffer)
-                    log.log_info(pos).foreach {
-                      log_info =>
-                        log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+                    for(key <- log_ref_key(pos)) {
+                      log_refs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
                     }
                   case None =>
                     println("Invalid queue entry, references message that was not in the
export: " + original_msg_key)

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?rev=1480600&r1=1480599&r2=1480600&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
Thu May  9 12:23:11 2013
@@ -473,6 +473,11 @@ case class RecordLog(directory: File, lo
     Option(log_infos.floorEntry(pos)).map(_.getValue)
   }
 
+  def log_file_positions = log_mutex.synchronized {
+    import collection.JavaConversions._
+    log_infos.map(_._2.position).toArray
+  }
+
   private def get_reader[T](record_position: Long)(func: (LogReader) => T) = {
 
     val lookup = log_mutex.synchronized {



Mime
View raw message