Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6662C9B3A for ; Thu, 16 Feb 2012 14:11:01 +0000 (UTC) Received: (qmail 65234 invoked by uid 500); 16 Feb 2012 14:11:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 65197 invoked by uid 500); 16 Feb 2012 14:11:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 65190 invoked by uid 99); 16 Feb 2012 14:11:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 14:11:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 14:10:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id AEBED2388A66 for ; Thu, 16 Feb 2012 14:10:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1244981 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala LevelDBStore.scala RecordLog.scala Date: Thu, 16 Feb 2012 14:10:32 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120216141032.AEBED2388A66@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Feb 16 14:10:32 2012 New Revision: 1244981 URL: http://svn.apache.org/viewvc?rev=1244981&view=rev Log: Fixes APLO-161 : LevelDB base storage slows down once you have several hundred log files hold message data. Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1244981&r1=1244980&r2=1244981&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Thu Feb 16 14:10:32 2012 @@ -1007,7 +1007,7 @@ class LevelDBClient(store: LevelDBStore) // TODO: // Perhaps we should snapshot_index if the current snapshot is old. // - + import collection.JavaConversions._ last_index_snapshot_pos val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1244981&r1=1244980&r2=1244981&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Thu Feb 16 14:10:32 2012 @@ -270,6 +270,7 @@ class LevelDBStore(val config:LevelDBSto rc.log_append_pos = client.log.appender_limit rc.index_snapshot_pos = client.last_index_snapshot_pos rc.log_stats = { + import collection.JavaConversions._ var row_layout = "%-20s | %-10s | %-10s\n" row_layout.format("Log File", "Msg Refs", "File Size")+ client.log.log_infos.map{case (id,info)=> id -> client.log_refs.get(id).map(_.get)}.toSeq.flatMap { case (id, refs)=> Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1244981&r1=1244980&r2=1244981&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Thu Feb 16 14:10:32 2012 @@ -21,13 +21,13 @@ import java.{util=>ju} import java.util.zip.CRC32 import java.util.Map.Entry -import collection.immutable.TreeMap import java.util.concurrent.atomic.AtomicLong import java.io._ import org.apache.activemq.apollo.util.FileSupport._ import org.apache.activemq.apollo.util.{Log, LRUCache} import org.fusesource.hawtdispatch.BaseRetained import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer} +import java.util.TreeMap object RecordLog extends Log { @@ -76,16 +76,16 @@ case class RecordLog(directory: File, lo var sync = false - var log_infos = TreeMap[Long, LogInfo]() + val log_infos = new TreeMap[Long, LogInfo]() object log_mutex def delete(id:Long) = { log_mutex.synchronized { // We can't delete the current appender. if( current_appender.position != id ) { - log_infos.get(id).foreach { info => + Option(log_infos.get(id)).foreach { info => onDelete(info.file) - log_infos = log_infos.filterNot(_._1 == id) + log_infos.remove(id) } } } @@ -377,28 +377,29 @@ case class RecordLog(directory: File, lo def create_appender(position: Long): Any = { log_mutex.synchronized { if(current_appender!=null) { - log_infos += position -> new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset) + log_infos.put (position, new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset)) } current_appender = create_log_appender(position) - log_infos += position -> current_appender.info + log_infos.put(position, current_appender.info) } } def open = { log_mutex.synchronized { - log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map { case (position,file) => - position -> LogInfo(file, position, file.length()) + log_infos.clear() + LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) => + log_infos.put(position, LogInfo(file, position, file.length())) } val appendPos = if( log_infos.isEmpty ) { 0L } else { - val (_, file) = log_infos.last + val file = log_infos.lastEntry().getValue val r = LogReader(file.file, file.position) try { val actualLength = r.verifyAndGetEndPosition val updated = file.copy(length = actualLength - file.position) - log_infos = log_infos + (updated.position->updated) + log_infos.put(updated.position, updated) if( updated.file.length != file.length ) { // we need to truncate. using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length)) @@ -453,7 +454,7 @@ case class RecordLog(directory: File, lo } } - def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2)) + def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue) } private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {