activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1480709 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: LevelDBClient.scala RecordLog.scala replicated/ReplicationSupport.scala
Date Thu, 09 May 2013 16:11:27 GMT
Author: chirino
Date: Thu May  9 16:11:26 2013
New Revision: 1480709

URL: http://svn.apache.org/r1480709
Log:
Fixes bug identified in APLO-284.  Ported fix over from Apollo.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1480709&r1=1480708&r2=1480709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Thu May  9 16:11:26 2013
@@ -64,6 +64,8 @@ object LevelDBClient extends Log {
 
   final val DIRTY_INDEX_KEY = bytes(":dirty")
   final val LOG_REF_INDEX_KEY = bytes(":log-refs")
+  final val LOGS_INDEX_KEY = bytes(":logs")
+
   final val COLLECTION_META_KEY = bytes(":collection-meta")
   final val TRUE = bytes("true")
   final val FALSE = bytes("false")
@@ -386,15 +388,17 @@ object LevelDBClient extends Log {
     var last_key:Array[Byte] = _
   }
 
-  def copy_index(from:File, to:File) = {
+  def copyIndex(from:File, to:File) = {
     for( file <- from.list_files ) {
-      val name = file.getName
-      if( name == "CURRENT" || name.startsWith("MANIFEST-") ) {
-        /// These might not be append only files, so avoid hard linking just to be safe.
-        file.copyTo(to / file.getName)
+      val name: String = file.getName
+      if( name.endsWith(".sst") ) {
+        // SST files don't change once created, safe to hard link.
+        file.linkTo(to / name)
+      } else if(name == "LOCK")  {
+        // No need to copy the lock file.
       } else {
-        // These are append only files, so they are safe to hard link.
-        file.linkTo(to / file.getName)
+        /// These might not be append only files, so avoid hard linking just to be safe.
+        file.copyTo(to / name)
       }
     }
   }
@@ -441,7 +445,8 @@ class LevelDBClient(store: LevelDBStore)
 
   var factory:DBFactory = _
   val logRefs = HashMap[Long, LongCounter]()
-  
+  var recoveryLogs:java.util.TreeMap[Long, Void] = _
+
   val collectionMeta = HashMap[Long, CollectionMeta]()
 
   def plistIndexFile = directory / ("plist"+INDEX_SUFFIX)
@@ -574,12 +579,9 @@ class LevelDBClient(store: LevelDBStore)
       dirtyIndexFile.recursiveDelete
       dirtyIndexFile.mkdirs()
 
-      lastSnapshotIndex.foreach { case (id, file) =>
-        // Resume log replay from a snapshot of the index..
+      for( (id, file)<- lastSnapshotIndex ) {
         try {
-          for( file <- file.list_files) {
-            file.linkTo(dirtyIndexFile / file.getName)
-          }
+          copyIndex(file, dirtyIndexFile)
         } catch {
           case e:Exception =>
             warn(e, "Could not recover snapshot of the index: "+e)
@@ -587,8 +589,15 @@ class LevelDBClient(store: LevelDBStore)
         }
       }
       index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
-      loadCounters
+      if ( store.paranoidChecks ) {
+        for(value <- index.get(DIRTY_INDEX_KEY) ) {
+          if( java.util.Arrays.equals(value, TRUE) ) {
+            warn("Recovering from a dirty index.")
+          }
+        }
+      }
       index.put(DIRTY_INDEX_KEY, TRUE)
+      loadCounters
     }
   }
 
@@ -694,23 +703,41 @@ class LevelDBClient(store: LevelDBStore)
           // replay failed.. good thing we are in a retry block...
           index.close
           throw e;
+      } finally {
+        recoveryLogs = null
       }
     }
   }
 
   private def logRefDecrement(pos: Long) {
-    log.log_info(pos).foreach { logInfo =>
-      logRefs.get(logInfo.position).foreach { counter =>
+    for( key <- logRefKey(pos) ) {
+      logRefs.get(key).foreach { counter =>
         if (counter.decrementAndGet() == 0) {
-          logRefs.remove(logInfo.position)
+          logRefs.remove(key)
         }
       }
     }
   }
 
   private def logRefIncrement(pos: Long) {
-    log.log_info(pos).foreach { logInfo =>
-      logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
+    for( key <- logRefKey(pos) ) {
+      logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
+    }
+  }
+
+  def logRefKey(pos: Long, log_info: RecordLog.LogInfo=null): Option[Long] = {
+    if( log_info!=null ) {
+      Some(log_info.position)
+    } else {
+      val rc = if( recoveryLogs !=null ) {
+        Option(recoveryLogs.floorKey(pos))
+      } else {
+        log.log_info(pos).map(_.position)
+      }
+      if( !rc.isDefined ) {
+        warn("Invalid log position: " + pos)
+      }
+      rc
     }
   }
 
@@ -741,8 +768,25 @@ class LevelDBClient(store: LevelDBStore)
         case e => throw e
       }
     }
+    def storeList[T <: AnyRef](key:Array[Byte], list:Array[Long]) {
+      val baos = new ByteArrayOutputStream()
+      val os = new ObjectOutputStream(baos);
+      os.writeInt(list.size);
+      for( k <- list ) {
+        os.writeLong(k)
+      }
+      os.close()
+      try {
+        index.put(key, baos.toByteArray)
+      }
+      catch {
+        case e => throw e
+      }
+    }
+
     storeMap(LOG_REF_INDEX_KEY, logRefs)
     storeMap(COLLECTION_META_KEY, collectionMeta)
+    storeList(LOGS_INDEX_KEY, log.log_file_positions)
   }
 
   private def loadCounters = {
@@ -758,8 +802,28 @@ class LevelDBClient(store: LevelDBStore)
         }
       }
     }
+    def loadList[T <: AnyRef](key:Array[Byte]) = {
+      index.get(key, new ReadOptions).map { value=>
+        val rc = ListBuffer[Long]()
+        val bais = new ByteArrayInputStream(value)
+        val is = new ObjectInputStream(bais);
+        var remaining = is.readInt()
+        while(remaining > 0 ) {
+          rc.append(is.readLong())
+          remaining-=1
+        }
+        rc
+      }
+    }
+
     loadMap(LOG_REF_INDEX_KEY, logRefs)
     loadMap(COLLECTION_META_KEY, collectionMeta)
+    for( list <- loadList(LOGS_INDEX_KEY) ) {
+      recoveryLogs = new java.util.TreeMap[Long, Void]()
+      for( k <- list ) {
+        recoveryLogs.put(k, null)
+      }
+    }
   }
 
   var wal_append_position = 0L
@@ -846,10 +910,8 @@ class LevelDBClient(store: LevelDBStore)
 
     try {
 
-      // Hard link all the index files.
-      for( file <- dirtyIndexFile.list_files) {
-        file.linkTo(tmpDir / file.getName)
-      }
+      // Copy the index to the tmp dir.
+      copyIndex(dirtyIndexFile, tmpDir)
 
       // Rename to signal that the snapshot is complete.
       tmpDir.renameTo(snapshotIndexFile(walPosition))
@@ -1191,8 +1253,8 @@ class LevelDBClient(store: LevelDBStore)
                 appender.append(LOG_ADD_ENTRY, log_data)
                 batch.put(key, index_data)
 
-                Option(log_info).orElse(log.log_info(dataLocator._1)).foreach { logInfo =>
-                  logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
+                for( key <- logRefKey(pos, log_info) ) {
+                  logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
                 }
 
                 collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
@@ -1331,7 +1393,7 @@ class LevelDBClient(store: LevelDBStore)
     // the the
 
     var limit = oldest_retained_snapshot
-    val deleteLimit = log.log_info(limit).map(_.position).getOrElse(limit).min(log.appender_start)
+    val deleteLimit = logRefKey(limit).getOrElse(limit).min(log.appender_start)
 
     emptyJournals.foreach { id =>
       if ( id < deleteLimit ) {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala?rev=1480709&r1=1480708&r2=1480709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
Thu May  9 16:11:26 2013
@@ -492,6 +492,11 @@ case class RecordLog(directory: File, lo
 
   def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue)
}
 
+  def log_file_positions = log_mutex.synchronized {
+    import collection.JavaConversions._
+    log_infos.map(_._2.position).toArray
+  }
+
   private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
 
     val lookup = log_mutex.synchronized {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala?rev=1480709&r1=1480708&r2=1480709&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
Thu May  9 16:11:26 2013
@@ -87,7 +87,7 @@ object ReplicationSupport {
       val index_file = index_dirs.last._2
       var target = to / index_file.getName
       target.mkdirs()
-      LevelDBClient.copy_index(index_file, target)
+      LevelDBClient.copyIndex(index_file, target)
     }
   }
 



Mime
View raw message