activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1480711 - in /activemq/trunk/activemq-leveldb-store/src/main: java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java scala/org/apache/activemq/leveldb/LevelDBClient.scala scala/org/apache/activemq/leveldb/LevelDBStore.scala
Date Thu, 09 May 2013 16:11:52 GMT
Author: chirino
Date: Thu May  9 16:11:52 2013
New Revision: 1480711

URL: http://svn.apache.org/r1480711
Log:
Fixes issue identified in APLO-245 where index does not seem to get cleaned up / compacted.
 Fix ported over from Apollo.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1480711&r1=1480710&r2=1480711&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
Thu May  9 16:11:52 2013
@@ -104,4 +104,8 @@ public interface LevelDBStoreViewMBean {
 
     @MBeanInfo("Gets the index statistics.")
     String getIndexStats();
+
+    @MBeanInfo("Compacts disk usage")
+    void compact();
+
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1480711&r1=1480710&r2=1480711&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Thu May  9 16:11:52 2013
@@ -38,7 +38,7 @@ import java.text.SimpleDateFormat
 import java.util.{Date, Collections}
 import org.apache.activemq.leveldb.util.TimeMetric
 import org.apache.activemq.leveldb.RecordLog.LogInfo
-import scala.Some
+import org.fusesource.leveldbjni.internal.JniDB
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -252,7 +252,7 @@ object LevelDBClient extends Log {
 
     def cursorKeysPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte]
=> Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(prefix);
+      might_trigger_compaction(iterator.seek(prefix));
       try {
         def check(key:Buffer) = {
           key.startsWith(prefix) && func(key)
@@ -267,7 +267,7 @@ object LevelDBClient extends Log {
 
     def cursorPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte])
=> Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(prefix);
+      might_trigger_compaction(iterator.seek(prefix));
       try {
         def check(key:Buffer) = {
           key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
@@ -286,7 +286,7 @@ object LevelDBClient extends Log {
 
     def cursorRangeKeys(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new
ReadOptions)(func: Array[Byte] => Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(startIncluded);
+      might_trigger_compaction(iterator.seek(startIncluded));
       try {
         def check(key:Array[Byte]) = {
           if ( compare(key,endExcluded) < 0) {
@@ -305,7 +305,7 @@ object LevelDBClient extends Log {
 
     def cursorRange(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new
ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(startIncluded);
+      might_trigger_compaction(iterator.seek(startIncluded));
       try {
         def check(key:Array[Byte]) = {
           (compare(key,endExcluded) < 0) && func(key, iterator.peekNext.getValue)
@@ -337,7 +337,7 @@ object LevelDBClient extends Log {
         val iterator = db.iterator(ro)
         try {
 
-          iterator.seek(last);
+          might_trigger_compaction(iterator.seek(last));
           if ( iterator.hasPrev ) {
             iterator.prev()
           } else {
@@ -359,6 +359,35 @@ object LevelDBClient extends Log {
         }
       }
     }
+
+    def compact = {
+      compact_needed = false
+      db match {
+        case db:JniDB =>
+          db.compactRange(null, null)
+//        case db:DbImpl =>
+//          val start = new Slice(Array[Byte]('a'.toByte))
+//          val end = new Slice(Array[Byte]('z'.toByte))
+//          db.compactRange(2, start, end)
+        case _ =>
+      }
+    }
+
+    private def might_trigger_compaction[T](func: => T): T = {
+      val start = System.nanoTime()
+      try {
+        func
+      } finally {
+        val duration = System.nanoTime() - start
+        // If it takes longer than 100 ms..
+        if( duration > 1000000*100 ) {
+          compact_needed = true
+        }
+      }
+    }
+
+    @volatile
+    var compact_needed = false
   }
 
 
@@ -1365,8 +1394,56 @@ class LevelDBClient(store: LevelDBStore)
     collectionMeta.get(collectionKey).flatMap(x=> Option(x.last_key)).map(new Buffer(_))
   }
 
+  // APLO-245: lets try to detect when leveldb needs a compaction..
+  private def detect_if_compact_needed:Unit = {
+
+    // auto compaction might be disabled...
+    if ( store.autoCompactionRatio <= 0 ) {
+      return
+    }
+
+    // How much space is the dirty index using??
+    var index_usage = 0L
+    for( file <- dirtyIndexFile.recursiveList ) {
+      if(!file.isDirectory) {
+        index_usage += file.length()
+      }
+    }
+    // Lets use the log_refs to get a rough estimate on how many entries are store in leveldb.
+    var index_queue_entries=0L
+    for ( (_, count) <- logRefs ) {
+      index_queue_entries += count.get()
+    }
+
+    if ( index_queue_entries > 0 ) {
+      val ratio = (index_usage*1.0f/index_queue_entries)
+      // println("usage: index_usage:%d, index_queue_entries:%d, ratio: %f".format(index_usage,
index_queue_entries, ratio))
+
+      // After running some load we empirically found that a healthy ratio is between 12
and 25 bytes per entry.
+      // lets compact if we go way over the healthy ratio.
+      if( ratio > store.autoCompactionRatio ) {
+        index.compact_needed = true
+      }
+    } else if( index_usage > 1024*1024*5 )  {
+      // at most the index should have 1 full level file.
+      index.compact_needed = true
+    }
+
+  }
+
   def gc(topicPositions:Seq[(Long, Long)]):Unit = {
 
+    detect_if_compact_needed
+
+    // Lets compact the leveldb index if it looks like we need to.
+    if( index.compact_needed ) {
+      debug("Compacting the leveldb index at: %s", dirtyIndexFile)
+      val start = System.nanoTime()
+      index.compact
+      val duration = System.nanoTime() - start;
+      info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0))
+    }
+
     // Delete message refs for topics who's consumers have advanced..
     if( !topicPositions.isEmpty ) {
       retryUsingIndex {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1480711&r1=1480710&r2=1480711&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Thu May  9 16:11:52 2013
@@ -33,8 +33,8 @@ import org.apache.activemq.broker.jmx.{B
 import org.apache.activemq.util._
 import org.apache.activemq.leveldb.util.{RetrySupport, Log}
 import org.apache.activemq.store.PList.PListIterator
-import java.lang
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
+import org.fusesource.hawtdispatch;
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -63,7 +63,7 @@ object LevelDBStore extends Log {
     return IOExceptionSupport.create(e)
   }
 
-  def waitOn(future: Future[AnyRef]): Unit = {
+  def waitOn(future: java.util.concurrent.Future[AnyRef]): Unit = {
     try {
       future.get
     }
@@ -116,6 +116,18 @@ class LevelDBStoreView(val store:LevelDB
   def resetMaxLogRotateLatency = db.client.log.max_log_rotate_latency.reset
 
   def getIndexStats = db.client.index.getProperty("leveldb.stats")
+
+  def compact() {
+    import hawtdispatch._
+    var done = new CountDownLatch(1)
+    val positions = getTopicGCPositions
+    client.writeExecutor {
+      client.index.compact_needed = true
+      client.gc(positions)
+      done.countDown()
+    }
+    done.await()
+  }
 }
 
 import LevelDBStore._
@@ -161,6 +173,8 @@ class LevelDBStore extends LockableServi
   var asyncBufferSize = 1024*1024*4
   @BeanProperty
   var monitorStats = false
+  @BeanProperty
+  var autoCompactionRatio = 100
 
   var purgeOnStatup: Boolean = false
 
@@ -822,14 +836,14 @@ class LevelDBStore extends LockableServi
       var pos = lastSeq.decrementAndGet()
       add(pos, id, bs)
       listSize.incrementAndGet()
-      new lang.Long(pos)
+      new java.lang.Long(pos)
     }
 
     def addLast(id: String, bs: ByteSequence): AnyRef = {
       var pos = lastSeq.incrementAndGet()
       add(pos, id, bs)
       listSize.incrementAndGet()
-      new lang.Long(pos)
+      new java.lang.Long(pos)
     }
 
     def add(pos:Long, id: String, bs: ByteSequence) = {
@@ -843,7 +857,7 @@ class LevelDBStore extends LockableServi
     }
 
     def remove(position: AnyRef): Boolean = {
-      val pos = position.asInstanceOf[lang.Long].longValue()
+      val pos = position.asInstanceOf[java.lang.Long].longValue()
       val encoded_key = encodeLongLong(key, pos)
       db.plistGet(encoded_key) match {
         case Some(value) =>



Mime
View raw message