activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233998 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Date Fri, 20 Jan 2012 16:53:31 GMT
Author: chirino
Date: Fri Jan 20 16:53:30 2012
New Revision: 1233998

URL: http://svn.apache.org/viewvc?rev=1233998&view=rev
Log:
leveldb would not GC log files if you deleted a queue via admin screen (ref counting error
on recovery).

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233998&r1=1233997&r2=1233998&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Fri Jan 20 16:53:30 2012
@@ -200,6 +200,7 @@ class LevelDBClient(store: LevelDBStore)
   var index:RichDB = _
   var index_options:Options = _
 
+  var last_index_snapshot_ts = System.currentTimeMillis()
   var last_index_snapshot_pos:Long = _
   val snapshot_rw_lock = new ReentrantReadWriteLock(true)
 
@@ -300,7 +301,7 @@ class LevelDBClient(store: LevelDBStore)
 
     // Only keep the last snapshot..
     snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach( _._2.recursive_delete )
-    temp_index_file.recursive_delete
+    temp_index_file.recursive_delete // usually does not exist.
 
     retry {
 
@@ -380,11 +381,17 @@ class LevelDBClient(store: LevelDBStore)
                     ro.verifyChecksums(verify_checksums)
                     val queue_key = decode_vlong(data)
                     index.delete(encode_key(queue_prefix, queue_key))
-                    index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key),
ro) {
-                      key =>
-                        index.delete(key)
-                        true
+                    index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro)
{ (key, value)=>
+                      index.delete(key)
+
+                      // Figure out what log file that message entry was in so we can,
+                      // decrement the log file reference.
+                      val entry_record:QueueEntryRecord = value
+                      val pos = decode_locator(entry_record.getMessageLocator)._1
+                      log_ref_decrement(pos)
+                      true
                     }
+
                   case LOG_MAP_ENTRY =>
                     replay_operations+=1
                     val entry = MapEntryPB.FACTORY.parseUnframed(data)
@@ -399,6 +406,9 @@ class LevelDBClient(store: LevelDBStore)
                 pos = next_pos
             }
           }
+          if(replay_operations > 0) {
+            snapshot_index
+          }
         }
         info("Took %.2f second(s) to recover %d operations in the log file.", (log_replay_duration/TimeUnit.SECONDS.toNanos(1).toFloat),
replay_operations)
 
@@ -500,6 +510,7 @@ class LevelDBClient(store: LevelDBStore)
       tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
       snapshot_index_file(last_index_snapshot_pos).recursive_delete
       last_index_snapshot_pos = new_snapshot_index_pos
+      last_index_snapshot_ts = System.currentTimeMillis()
 
     } catch {
       case e: Exception =>
@@ -888,6 +899,11 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def gc:Unit = {
+
+    // TODO:
+    // Perhaps we should snapshot_index if the current snapshot is old.
+    //
+
     last_index_snapshot_pos
     val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
 



Mime
View raw message