activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233765 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala RecordLog.scala
Date Fri, 20 Jan 2012 04:53:58 GMT
Author: chirino
Date: Fri Jan 20 04:53:58 2012
New Revision: 1233765

URL: http://svn.apache.org/viewvc?rev=1233765&view=rev
Log:
Log duration and number of records that we replaying during store recovery in case a long
startup occurs.  Also record an end of unit of work record so that we don't try to recover
partial UoWs.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233765&r1=1233764&r2=1233765&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Fri Jan 20 04:53:58 2012
@@ -280,9 +280,18 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
 
-    retry {
-      log.open
+    def time[T](func: =>T):Long = {
+      val start = System.nanoTime()
+      func  
+      System.nanoTime() - start 
+    }
+    
+    val log_open_duration = time {
+      retry {
+        log.open
+      }
     }
+    info("Opening the log file took: %.2f ms", (log_open_duration/TimeUnit.MILLISECONDS.toNanos(1).toFloat))
 
     // Find out what was the last snapshot.
     val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
@@ -313,21 +322,25 @@ class LevelDBClient(store: LevelDBStore)
       }
 
       index = new RichDB(factory.open(dirty_index_file, index_options));
+
       try {
         load_log_refs
         index.put(dirty_index_key, TRUE)
         // Update the index /w what was stored on the logs..
         var pos = last_index_snapshot_pos;
 
-        try {
+        var replay_operations = 0
+        val log_replay_duration = time {
           while (pos < log.appender_limit) {
             log.read(pos).map {
               case (kind, data, next_pos) =>
                 kind match {
                   case LOG_ADD_MESSAGE =>
+                    replay_operations+=1
                     val record: MessageRecord = data
                     index.put(encode_key(message_prefix, record.key), encode_locator(pos,
data.length))
                   case LOG_ADD_QUEUE_ENTRY =>
+                    replay_operations+=1
                     val record: QueueEntryRecord = data
                     index.put(encode_key(queue_entry_prefix, record.queue_key, record.entry_seq),
data)
                     
@@ -341,7 +354,7 @@ class LevelDBClient(store: LevelDBStore)
                     // Increment it.
                     pos.foreach(log_ref_increment(_))
                   case LOG_REMOVE_QUEUE_ENTRY =>
-
+                    replay_operations+=1
                     index.get(data, new ReadOptions).foreach { value=>
                       val record: QueueEntryRecord = value
   
@@ -357,9 +370,11 @@ class LevelDBClient(store: LevelDBStore)
                     }
                     
                   case LOG_ADD_QUEUE =>
+                    replay_operations+=1
                     val record: QueueRecord = data
                     index.put(encode_key(queue_prefix, record.key), data)
                   case LOG_REMOVE_QUEUE =>
+                    replay_operations+=1
                     val ro = new ReadOptions
                     ro.fillCache(false)
                     ro.verifyChecksums(verify_checksums)
@@ -371,6 +386,7 @@ class LevelDBClient(store: LevelDBStore)
                         true
                     }
                   case LOG_MAP_ENTRY =>
+                    replay_operations+=1
                     val entry = MapEntryPB.FACTORY.parseUnframed(data)
                     if (entry.getValue == null) {
                       index.delete(encode_key(map_prefix, entry.getKey))
@@ -378,16 +394,13 @@ class LevelDBClient(store: LevelDBStore)
                       index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
                     }
                   case _ =>
-                  // Skip unknown records like the RecordLog headers.
+                  // Skip unknown records
                 }
                 pos = next_pos
             }
           }
         }
-        catch {
-          case e:Throwable => e.printStackTrace()
-        }
-
+        info("Took %.2f second(s) to recover %d operations in the log file.", (log_replay_duration/TimeUnit.SECONDS.toNanos(1).toFloat),
replay_operations)
 
       } catch {
         case e:Throwable =>

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1233765&r1=1233764&r2=1233765&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Fri Jan 20 04:53:58 2012
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.Atomi
 import java.io._
 import org.apache.activemq.apollo.util.FileSupport._
 import org.apache.activemq.apollo.util.{Log, LRUCache}
-import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
 import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
+import org.fusesource.hawtbuf.{AbstractVarIntSupport, DataByteArrayInputStream, DataByteArrayOutputStream,
Buffer}
 
 object RecordLog extends Log {
 
@@ -41,6 +41,8 @@ object RecordLog extends Log {
   //   length   : uint32     // the length the the data
 
   val LOG_HEADER_PREFIX = '*'.toByte
+  val UOW_END_RECORD = -1.toByte
+  
   val LOG_HEADER_SIZE = 10
 
   val BUFFER_SIZE = 1024*512
@@ -49,6 +51,18 @@ object RecordLog extends Log {
   case class LogInfo(file:File, position:Long, length:Long) {
     def limit = position+length
   }
+  
+  def encode_long(a1:Long) = {
+    val out = new DataByteArrayOutputStream(8)
+    out.writeLong(a1)
+    out.toBuffer
+  }
+
+  def decode_long(value:Buffer):Long = {
+    val in = new DataByteArrayInputStream(value)
+    in.readLong()
+  }
+  
 }
 
 case class RecordLog(directory: File, logSuffix:String) {
@@ -271,7 +285,7 @@ case class RecordLog(directory: File, lo
       (id, data, record_position+LOG_HEADER_SIZE+length)
     }
 
-    def check(record_position:Long):Option[Long] = {
+    def check(record_position:Long):Option[(Long, Option[Long])] = {
       var offset = (record_position-position).toInt
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
@@ -280,7 +294,7 @@ case class RecordLog(directory: File, lo
       if( prefix != LOG_HEADER_PREFIX ) {
         return None // Does not look like a record.
       }
-      val id = is.readByte()
+      val kind = is.readByte()
       val expectedChecksum = is.readInt()
       val length = is.readInt()
 
@@ -310,19 +324,30 @@ case class RecordLog(directory: File, lo
       if( expectedChecksum !=  checksum ) {
         return None
       }
-      return Some(record_position+LOG_HEADER_SIZE+length)
+      val uow_start_pos = if(kind == UOW_END_RECORD && length==8) Some(decode_long(chunk))
else None
+      return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
     }
 
     def verifyAndGetEndPosition:Long = {
       var pos = position;
+      var current_uow_start = pos
       val limit = position+channel.size()
       while(pos < limit) {
         check(pos) match {
-          case Some(next) => pos = next
-          case None => return pos
+          case Some((next, uow_start_pos)) =>
+            uow_start_pos.foreach { uow_start_pos => 
+              if( uow_start_pos == current_uow_start ) {
+                current_uow_start = next
+              } else {
+                return current_uow_start
+              }
+            }
+            pos = next
+          case None =>
+            return current_uow_start
         }
       }
-      pos
+      return current_uow_start
     }
   }
 
@@ -381,8 +406,14 @@ case class RecordLog(directory: File, lo
   def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix)
 
   def appender[T](func: (LogAppender)=>T):T= {
+    val intial_position = current_appender.append_position
     try {
-      func(current_appender)
+      val rc = func(current_appender)
+      if( current_appender.append_position != intial_position ) {
+        // Record a UOW_END_RECORD so that on recovery we only replay full units of work.
+        current_appender.append(UOW_END_RECORD,encode_long(intial_position))
+      }
+      rc
     } finally {
       current_appender.flush
       log_mutex.synchronized {



Mime
View raw message