activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1210901 - in /activemq/activemq-apollo/trunk: apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/ apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ apollo-leveldb/src/main...
Date Tue, 06 Dec 2011 13:27:22 GMT
Author: chirino
Date: Tue Dec  6 13:27:22 2011
New Revision: 1210901

URL: http://svn.apache.org/viewvc?rev=1210901&view=rev
Log:
Made the leveldb store's GC passes much more efficient by maintaining in memory log reference
counters.  

Removed the gc_interval config option since we can now run the gc frequently and fixed it
to run every 10 seconds.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Tue Dec  6 13:27:22 2011
@@ -34,9 +34,6 @@ public class LevelDBStoreDTO extends Sto
     @XmlAttribute
     public File directory;
 
-    @XmlAttribute(name="gc_interval")
-    public Integer gc_interval;
-
     @XmlAttribute(name="read_threads")
     public Integer read_threads;
 
@@ -85,7 +82,6 @@ public class LevelDBStoreDTO extends Sto
         LevelDBStoreDTO that = (LevelDBStoreDTO) o;
 
         if (directory != null ? !directory.equals(that.directory) : that.directory != null)
return false;
-        if (gc_interval != null ? !gc_interval.equals(that.gc_interval) : that.gc_interval
!= null) return false;
         if (index_block_restart_interval != null ? !index_block_restart_interval.equals(that.index_block_restart_interval)
: that.index_block_restart_interval != null)
             return false;
         if (index_block_size != null ? !index_block_size.equals(that.index_block_size) :
that.index_block_size != null)
@@ -116,7 +112,6 @@ public class LevelDBStoreDTO extends Sto
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (directory != null ? directory.hashCode() : 0);
-        result = 31 * result + (gc_interval != null ? gc_interval.hashCode() : 0);
         result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
         result = 31 * result + (index_factory != null ? index_factory.hashCode() : 0);
         result = 31 * result + (sync != null ? sync.hashCode() : 0);

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
Tue Dec  6 13:27:22 2011
@@ -48,15 +48,6 @@ public class LevelDBStoreStatusDTO exten
     @XmlElement(name="last_checkpoint_pos")
     public long index_snapshot_pos;
 
-    @XmlElement(name="last_gc_ts")
-    public long last_gc_ts;
-
-    @XmlElement(name="in_gc")
-    public boolean in_gc;
-
-    @XmlElement(name="last_gc_duration")
-    public long last_gc_duration;
-
     @XmlElement(name="last_append_pos")
     public long log_append_pos;
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
Tue Dec  6 13:27:22 2011
@@ -32,6 +32,12 @@ object HelperTrait {
     out.getData
   }
 
+  def decode_long(bytes:Buffer):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+//    in.readVarLong()
+    in.readLong()
+  }
+
   def decode_long(bytes:Array[Byte]):Long = {
     val in = new DataByteArrayInputStream(bytes)
 //    in.readVarLong()

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Tue Dec  6 13:27:22 2011
@@ -26,17 +26,20 @@ import org.apache.activemq.apollo.broker
 import java.io._
 import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.util._
-import collection.mutable.ListBuffer
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
 import collection.immutable.TreeMap
-import org.iq80.leveldb._
 import org.fusesource.leveldbjni.internal.Util
 import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
 import java.util.concurrent.atomic.AtomicReference
 import org.apache.activemq.apollo.broker.Broker
 import org.apache.activemq.apollo.util.ProcessSupport._
+import collection.mutable.{HashMap, ListBuffer}
+import org.apache.activemq.apollo.dto.JsonCodec
+import java.util.Map
+import org.iq80.leveldb._
+import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -54,6 +57,7 @@ object LevelDBClient extends Log {
   final val queue_entry_prefix_array = Array(queue_entry_prefix)
 
   final val dirty_index_key = bytes(":dirty")
+  final val log_refs_index_key = bytes(":log-refs")
   final val TRUE = bytes("true")
   final val FALSE = bytes("false")
 
@@ -89,15 +93,6 @@ object LevelDBClient extends Log {
     }): _* )
   }
 
-  case class UsageCounter() {
-    var count = 0L
-    var size = 0L
-    def increment(value:Int) = {
-      count += 1
-      size += value
-    }
-  }
-
   val on_windows = System.getProperty("os.name").toLowerCase().startsWith("windows")
 
   var link_strategy = 0
@@ -193,12 +188,8 @@ class LevelDBClient(store: LevelDBStore)
   var last_index_snapshot_pos:Long = _
   val snapshot_rw_lock = new ReentrantReadWriteLock(true)
 
-  var last_gc_ts = 0L
-  var last_gc_duration = 0L
-  var in_gc = false
-  var gc_detected_log_usage = Map[Long, UsageCounter]()
   var factory:DBFactory = _
-
+  val log_refs = HashMap[Long, LongCounter]()
 
   def dirty_index_file = directory / ("dirty"+INDEX_SUFFIX)
   def temp_index_file = directory / ("temp"+INDEX_SUFFIX)
@@ -259,7 +250,7 @@ class LevelDBClient(store: LevelDBStore)
       // lets queue a request to checkpoint when
       // the logs rotate.. queue it on the GC thread since GC's lock
       // the index for a long time.
-      store.gc_executor {
+      store.write_executor {
         snapshot_index
       }
     }
@@ -298,11 +289,11 @@ class LevelDBClient(store: LevelDBStore)
 
       index = new RichDB(factory.open(dirty_index_file, index_options));
       try {
+        load_log_refs
         index.put(dirty_index_key, TRUE)
         // Update the index /w what was stored on the logs..
         var pos = last_index_snapshot_pos;
 
-        // Replay the log from the last update position..
         try {
           while (pos < log.appender_limit) {
             log.read(pos).map {
@@ -314,8 +305,43 @@ class LevelDBClient(store: LevelDBStore)
                   case LOG_ADD_QUEUE_ENTRY =>
                     val record: QueueEntryRecord = data
                     index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq),
data)
+                    
+                    // Figure out which log file this message reference is pointing at..
+                    val log_key = (if(record.message_locator!=null) {
+                      Some(decode_long(record.message_locator))
+                    } else {
+                      index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+                    }).flatMap(log.log_info(_)).map(_.position)
+                    
+                    // Increment it.
+                    log_key.foreach { log_key=>
+                      log_refs.getOrElseUpdate(log_key, new LongCounter()).incrementAndGet()
+                    }
+                    
                   case LOG_REMOVE_QUEUE_ENTRY =>
-                    index.delete(data)
+
+                    index.get(data, new ReadOptions).foreach { value=>
+                      val record: QueueEntryRecord = value
+  
+                      // Figure out which log file this message reference is pointing at..
+                      val log_key = (if(record.message_locator!=null) {
+                        Some(decode_long(record.message_locator))
+                      } else {
+                        index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+                      }).flatMap(log.log_info(_)).map(_.position)
+                      
+                      // Decrement it.
+                      log_key.foreach { log_key=>
+                        log_refs.get(log_key).foreach{ counter=>
+                          if( counter.decrementAndGet() == 0 ) {
+                            log_refs.remove(log_key)
+                          }
+                        }
+                      }
+                      
+                      index.delete(data)
+                    }
+                    
                   case LOG_ADD_QUEUE =>
                     val record: QueueRecord = data
                     index.put(encode(queue_prefix, record.key), data)
@@ -358,6 +384,20 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
+  private def store_log_refs = {
+    index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
+  }
+
+  private def load_log_refs = {
+    log_refs.clear()
+    index.get(log_refs_index_key, new ReadOptions).foreach { value=>
+      val javamap = JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String, Object]])
+      collection.JavaConversions.mapAsScalaMap(javamap).foreach { case (k,v)=>
+        log_refs.put(k.toLong, new LongCounter(v.asInstanceOf[Number].longValue()))
+      }
+    }
+  }
+  
   def stop() = {
     // this blocks until all io completes..
     // Suspend also deletes the index.
@@ -392,6 +432,7 @@ class LevelDBClient(store: LevelDBStore)
     snapshot_rw_lock.writeLock().lock()
 
     // Close the index so that it's files are not changed async on us.
+    store_log_refs
     index.put(dirty_index_key, FALSE, new WriteOptions().sync(true))
     index.close
   }
@@ -550,7 +591,7 @@ class LevelDBClient(store: LevelDBStore)
 
             uow.actions.foreach { case (msg, action) =>
               val message_record = action.message_record
-              var pos = 0L
+              var pos = -1L
               var pos_buffer:Buffer = null
 
               if (message_record != null) {
@@ -566,10 +607,19 @@ class LevelDBClient(store: LevelDBStore)
               action.dequeues.foreach { entry =>
                 if( pos_buffer==null && entry.message_locator!=null ) {
                   pos_buffer = entry.message_locator
+                  pos = decode_long(pos_buffer)
                 }
                 val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
                 batch.delete(key)
+
+                log.log_info(pos).foreach { log_info=>
+                  log_refs.get(log_info.position).foreach{ counter=>
+                    if( counter.decrementAndGet() == 0 ) {
+                      log_refs.remove(log_info.position)
+                    }
+                  }
+                }
               }
 
               action.enqueues.foreach { entry =>
@@ -577,6 +627,12 @@ class LevelDBClient(store: LevelDBStore)
                 val encoded:Array[Byte] = entry
                 appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
                 batch.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+                
+                // Increment it.
+                log.log_info(pos).foreach { log_info=>
+                  log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+                }
+                
               }
             }
             if( !uow.complete_listeners.isEmpty ) {
@@ -779,32 +835,48 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def gc:Unit = {
-    var active_counter = 0
-    var delete_counter = 0
-    val latency_counter = new TimeCounter
+    last_index_snapshot_pos
+    val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
 
-    val ro = new ReadOptions()
-    ro.fillCache(false)
-    ro.verifyChecksums(verify_checksums)
+    // We don't want to delete any journals that the index has not snapshot'ed or
+    // the the
+    val delete_limit = log.log_info(last_index_snapshot_pos).map(_.position).
+          getOrElse(last_index_snapshot_pos).min(log.appender_start)
 
-    //
-    // This journal_usage will let us get a picture of which queues are using how much of
each
-    // log file.  It will help folks figure out why a log file is not getting deleted.
-    //
-    val journal_usage = new ApolloTreeMap[Long,(RecordLog#LogInfo , UsageCounter)]()
-    var append_journal = 0L
+    empty_journals.foreach { id =>
+      if ( id < delete_limit ) {
+        log.delete(id)
+      }
+    }
+  }
+  
+  case class UsageCounter(info:RecordLog#LogInfo) {
+    var count = 0L
+    var size = 0L
+    var first_reference_queue:QueueRecord = _
+    
+    def increment(value:Int) = {
+      count += 1
+      size += value
+    }
+  }
 
+  //
+  // Collects detailed usage information about the journal like who's referencing it.
+  //
+  def get_log_usage_details = {
+
+    val usage_map = new ApolloTreeMap[Long,UsageCounter]()
     log.log_mutex.synchronized {
-      append_journal = log.log_infos.last._1
-      log.log_infos.foreach(entry=> journal_usage.put(entry._1, (entry._2, UsageCounter()))
)
+      log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
     }
 
-    def find_journal(pos: Long) = {
-      var entry = journal_usage.floorEntry(pos)
+    def lookup_usage(pos: Long) = {
+      var entry = usage_map.floorEntry(pos)
       if (entry != null) {
-        val (info, usageCounter) = entry.getValue()
-        if (pos < info.limit) {
-          Some(entry.getKey -> usageCounter)
+        val usage = entry.getValue()
+        if (pos < usage.info.limit) {
+          Some(usage)
         } else {
           None
         }
@@ -813,88 +885,38 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
 
-    in_gc = true
-    val now = System.currentTimeMillis()
-    debug(store.store_kind+" gc starting")
-    latency_counter.time {
-
-      retry_using_index {
-        index.snapshot { snapshot =>
-          ro.snapshot(snapshot)
+    val ro = new ReadOptions()
+    ro.fillCache(false)
+    ro.verifyChecksums(verify_checksums)
 
-          // Figure out which journal files are still in use by which queues.
-          index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
-            val entry_record:QueueEntryRecord = value
-            val pos = if(entry_record.message_locator!=null) {
-              decode_long(entry_record.message_locator.toByteArray)
-            } else {
-              index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_)).getOrElse(0L)
-            }
+    retry_using_index {
+      index.snapshot { snapshot =>
+        ro.snapshot(snapshot)
 
-            find_journal(pos) match {
-              case Some((key,usageCounter)) =>
-                usageCounter.increment(entry_record.size)
-              case None =>
-            }
+        // Figure out which journal files are still in use by which queues.
+        index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
 
-            // only continue while the service is still running..
-            store.service_state.is_started
+          val entry_record:QueueEntryRecord = value
+          val pos = if(entry_record.message_locator!=null) {
+            Some(decode_long(entry_record.message_locator))
+          } else {
+            index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_))
           }
 
-          if (store.service_state.is_started) {
-
-            gc_detected_log_usage = Map((collection.JavaConversions.asScalaSet(journal_usage.entrySet()).map
{ x=>
-              x.getKey -> x.getValue._2
-            }).toSeq : _ * )
-
-            // Take empty journals out of the map..
-            val empty_journals = ListBuffer[Long]()
-
-            val i = journal_usage.entrySet().iterator();
-            while( i.hasNext ) {
-              val (info, usageCounter) = i.next().getValue
-              if( usageCounter.count==0 && info.position < append_journal) {
-                empty_journals += info.position
-                i.remove()
-              }
-            }
-
-            index.cursor_prefixed(message_prefix_array) { (key,value) =>
-              val pos = decode_long(value)
-
-              if ( !find_journal(pos).isDefined ) {
-                // Delete it.
-                index.delete(key)
-                delete_counter += 1
-              } else {
-                active_counter += 1
-              }
-              // only continue while the service is still running..
-              store.service_state.is_started
-            }
-
-            if (store.service_state.is_started) {
-              // We don't want to delete any journals that the index has not snapshot'ed
or
-              // the the
-              val delete_limit = find_journal(last_index_snapshot_pos).map(_._1).
-                    getOrElse(last_index_snapshot_pos).min(log.appender_start)
-
-              empty_journals.foreach { id =>
-                if ( id < delete_limit ) {
-                  log.delete(id)
-                }
-              }
+          pos.flatMap(lookup_usage(_)).foreach { usage =>
+            if( usage.first_reference_queue == null ) {
+              usage.first_reference_queue = index.get(encode(queue_prefix, entry_record.queue_key),
ro).map( x=> decode_queue_record(x) ).getOrElse(null)
             }
+            usage.increment(entry_record.size)
           }
-        }
-
 
+          true
+        }
       }
     }
-    last_gc_ts=now
-    last_gc_duration = latency_counter.total(TimeUnit.MILLISECONDS)
-    in_gc = false
-    debug(store.store_kind+" gc ended")
+
+    import collection.JavaConversions._
+    usage_map.values.toSeq.toArray
   }
 
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Tue Dec  6 13:27:22 2011
@@ -48,7 +48,6 @@ class LevelDBStore(val config:LevelDBSto
   var next_msg_key = new AtomicLong(1)
 
   var write_executor:ExecutorService = _
-  var gc_executor:ExecutorService = _
   var read_executor:ExecutorService = _
 
   var client:LevelDBClient = _
@@ -83,13 +82,6 @@ class LevelDBStore(val config:LevelDBSto
           rc
         }
       })
-      gc_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
-        def newThread(r: Runnable) = {
-          val rc = new Thread(r, store_kind + " store gc")
-          rc.setDaemon(true)
-          rc
-        }
-      })
       read_executor = Executors.newFixedThreadPool(config.read_threads.getOrElse(10), new
ThreadFactory() {
         def newThread(r: Runnable) = {
           val rc = new Thread(r, store_kind + " store io read")
@@ -128,7 +120,6 @@ class LevelDBStore(val config:LevelDBSto
         read_executor.shutdown
         read_executor.awaitTermination(60, TimeUnit.SECONDS)
         read_executor = null
-        gc_executor.shutdown
         client.stop
         on_completed.run
       }
@@ -140,20 +131,15 @@ class LevelDBStore(val config:LevelDBSto
     ss.is_starting || ss.is_started
   }
 
-  def poll_gc:Unit = {
-    val interval = config.gc_interval.getOrElse(60*30)
-    if( interval>0 ) {
-      dispatch_queue.after(interval, TimeUnit.SECONDS) {
-        if( keep_polling ) {
-          gc {
-            poll_gc
-          }
-        }
+  def poll_gc:Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
+    if( keep_polling ) {
+      gc {
+        poll_gc
       }
     }
   }
 
-  def gc(onComplete: =>Unit) = gc_executor {
+  def gc(onComplete: =>Unit) = write_executor {
     client.gc
     onComplete
   }
@@ -277,23 +263,18 @@ class LevelDBStore(val config:LevelDBSto
         rc.index_stats = client.index.getProperty("leveldb.stats")
         rc.log_append_pos = client.log.appender_limit
         rc.index_snapshot_pos = client.last_index_snapshot_pos
-        rc.last_gc_duration = client.last_gc_duration
-        rc.last_gc_ts = client.last_gc_ts
-        rc.in_gc = client.in_gc
         rc.log_stats = {
-          var row_layout = "%-20s | %-10s | %10s/%-10s\n"
-          row_layout.format("File", "Messages", "Used Size", "Total Size")+
-          client.log.log_infos.map(x=> x._1 -> client.gc_detected_log_usage.get(x._1)).toSeq.flatMap
{ x=>
+          var row_layout = "%-20s | %-10s | %-10s\n"
+          row_layout.format("File", "References", "Total Size")+
+          client.log.log_infos.map{case (id,info)=> id -> client.log_refs.get(id).map(_.get)}.toSeq.flatMap
{ case (id, refs)=>
             try {
-              val file = LevelDBClient.create_sequence_file(client.directory, x._1, LevelDBClient.LOG_SUFFIX)
+              val file = LevelDBClient.create_sequence_file(client.directory, id, LevelDBClient.LOG_SUFFIX)
               val size = file.length()
-              val usage = x._2 match {
-                case Some(usage)=>
-                  (usage.count.toString, ViewHelper.memory(usage.size))
-                case None=>
-                  ("unknown", "unknown")
-              }
-              Some(row_layout.format(file.getName, usage._1, usage._2, ViewHelper.memory(size)))
+              Some(row_layout.format(
+                file.getName,
+                refs.getOrElse(0L).toString,
+                ViewHelper.memory(size)
+              ))
             } catch {
               case e:Throwable =>
                 None

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Tue Dec  6 13:27:22 2011
@@ -323,10 +323,10 @@ case class RecordLog(directory: File, lo
     }
   }
 
+  def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2))
 
   private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
-    val infos = log_mutex.synchronized(log_infos)
-    val info = infos.range(0L, pos+1).lastOption.map(_._2)
+    val info = log_info(pos)
     info.map { info =>
       // Checkout a reader from the cache...
       val (set, reader_id, reader) = reader_cache_files.synchronized {

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
Tue Dec  6 13:27:22 2011
@@ -46,15 +46,13 @@ h2 Store Latency Stats
 - show("UOW flush latency", flush_latency)
 
 h2 Log Status
-p last log GC occured #{uptime(last_gc_ts)}
-p last log GC duration: #{friendly_duration(last_gc_duration)}
 pre
   !~~ log_stats
 p
-  Index recovery starts from log position:
+  | Index recovery starts from log position:
   code #{"%016x".format(index_snapshot_pos)}
 p
-  Append position:
+  | Append position:
   code #{"%016x".format(log_append_pos)}
 
 h2 Index Status

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Tue Dec
 6 13:27:22 2011
@@ -385,8 +385,6 @@ A `leveldb_store` element may be configu
   that a store will delay persisting a messaging unit of work in hopes
   that it will be invalidated shortly thereafter by another unit of work
   which would negate the operation.
-* `gc_interval` : How often to check to find log files which can be discarded 
-   in seconds. The value defaults to 1800 (30 minutes).
 * `read_threads` : The number of concurrent IO reads to allow. The value 
    defaults to 10.
 * `sync` : If set to `false`, then the store does not sync logging operations to 



Mime
View raw message