activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1244981 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala LevelDBStore.scala RecordLog.scala
Date Thu, 16 Feb 2012 14:10:32 GMT
Author: chirino
Date: Thu Feb 16 14:10:32 2012
New Revision: 1244981

URL: http://svn.apache.org/viewvc?rev=1244981&view=rev
Log:
Fixes APLO-161 : LevelDB base storage slows down once you have several hundred log files hold
message data.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1244981&r1=1244980&r2=1244981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Thu Feb 16 14:10:32 2012
@@ -1007,7 +1007,7 @@ class LevelDBClient(store: LevelDBStore)
     // TODO:
     // Perhaps we should snapshot_index if the current snapshot is old.
     //
-
+    import collection.JavaConversions._
     last_index_snapshot_pos
     val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1244981&r1=1244980&r2=1244981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Thu Feb 16 14:10:32 2012
@@ -270,6 +270,7 @@ class LevelDBStore(val config:LevelDBSto
         rc.log_append_pos = client.log.appender_limit
         rc.index_snapshot_pos = client.last_index_snapshot_pos
         rc.log_stats = {
+          import collection.JavaConversions._
           var row_layout = "%-20s | %-10s | %-10s\n"
           row_layout.format("Log File", "Msg Refs", "File Size")+
           client.log.log_infos.map{case (id,info)=> id -> client.log_refs.get(id).map(_.get)}.toSeq.flatMap
{ case (id, refs)=>

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1244981&r1=1244980&r2=1244981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Thu Feb 16 14:10:32 2012
@@ -21,13 +21,13 @@ import java.{util=>ju}
 
 import java.util.zip.CRC32
 import java.util.Map.Entry
-import collection.immutable.TreeMap
 import java.util.concurrent.atomic.AtomicLong
 import java.io._
 import org.apache.activemq.apollo.util.FileSupport._
 import org.apache.activemq.apollo.util.{Log, LRUCache}
 import org.fusesource.hawtdispatch.BaseRetained
 import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
+import java.util.TreeMap
 
 object RecordLog extends Log {
 
@@ -76,16 +76,16 @@ case class RecordLog(directory: File, lo
   var sync = false
 
 
-  var log_infos = TreeMap[Long, LogInfo]()
+  val log_infos = new TreeMap[Long, LogInfo]()
   object log_mutex
 
   def delete(id:Long) = {
     log_mutex.synchronized {
       // We can't delete the current appender.
       if( current_appender.position != id ) {
-        log_infos.get(id).foreach { info =>
+        Option(log_infos.get(id)).foreach { info =>
           onDelete(info.file)
-          log_infos = log_infos.filterNot(_._1 == id)
+          log_infos.remove(id)
         }
       }
     }
@@ -377,28 +377,29 @@ case class RecordLog(directory: File, lo
   def create_appender(position: Long): Any = {
     log_mutex.synchronized {
       if(current_appender!=null) {
-        log_infos += position -> new LogInfo(current_appender.file, current_appender.position,
current_appender.append_offset)
+        log_infos.put (position, new LogInfo(current_appender.file, current_appender.position,
current_appender.append_offset))
       }
       current_appender = create_log_appender(position)
-      log_infos += position -> current_appender.info
+      log_infos.put(position, current_appender.info)
     }
   }
 
   def open = {
     log_mutex.synchronized {
-      log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map { case (position,file)
=>
-        position -> LogInfo(file, position, file.length())
+      log_infos.clear()
+      LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file)
=>
+        log_infos.put(position, LogInfo(file, position, file.length()))
       }
 
       val appendPos = if( log_infos.isEmpty ) {
         0L
       } else {
-        val (_, file) = log_infos.last
+        val file = log_infos.lastEntry().getValue
         val r = LogReader(file.file, file.position)
         try {
           val actualLength = r.verifyAndGetEndPosition
           val updated = file.copy(length = actualLength - file.position)
-          log_infos = log_infos + (updated.position->updated)
+          log_infos.put(updated.position, updated)
           if( updated.file.length != file.length ) {
             // we need to truncate.
             using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
@@ -453,7 +454,7 @@ case class RecordLog(directory: File, lo
     }
   }
 
-  def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2))
+  def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue)
}
 
   private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
 



Mime
View raw message