Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1CB3DB1AD for ; Fri, 20 Jan 2012 16:53:53 +0000 (UTC) Received: (qmail 19165 invoked by uid 500); 20 Jan 2012 16:53:53 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 19098 invoked by uid 500); 20 Jan 2012 16:53:52 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 19091 invoked by uid 99); 20 Jan 2012 16:53:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2012 16:53:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2012 16:53:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 26E3B23889ED for ; Fri, 20 Jan 2012 16:53:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1233998 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Date: Fri, 20 Jan 2012 16:53:31 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120120165331.26E3B23889ED@eris.apache.org> Author: chirino Date: Fri Jan 20 16:53:30 2012 New Revision: 1233998 URL: http://svn.apache.org/viewvc?rev=1233998&view=rev Log: leveldb would not GC log files if you deleted a queue via admin screen (ref counting error on recovery). Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233998&r1=1233997&r2=1233998&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Jan 20 16:53:30 2012 @@ -200,6 +200,7 @@ class LevelDBClient(store: LevelDBStore) var index:RichDB = _ var index_options:Options = _ + var last_index_snapshot_ts = System.currentTimeMillis() var last_index_snapshot_pos:Long = _ val snapshot_rw_lock = new ReentrantReadWriteLock(true) @@ -300,7 +301,7 @@ class LevelDBClient(store: LevelDBStore) // Only keep the last snapshot.. snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach( _._2.recursive_delete ) - temp_index_file.recursive_delete + temp_index_file.recursive_delete // usually does not exist. retry { @@ -380,11 +381,17 @@ class LevelDBClient(store: LevelDBStore) ro.verifyChecksums(verify_checksums) val queue_key = decode_vlong(data) index.delete(encode_key(queue_prefix, queue_key)) - index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { - key => - index.delete(key) - true + index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { (key, value)=> + index.delete(key) + + // Figure out what log file that message entry was in so we can, + // decrement the log file reference. + val entry_record:QueueEntryRecord = value + val pos = decode_locator(entry_record.getMessageLocator)._1 + log_ref_decrement(pos) + true } + case LOG_MAP_ENTRY => replay_operations+=1 val entry = MapEntryPB.FACTORY.parseUnframed(data) @@ -399,6 +406,9 @@ class LevelDBClient(store: LevelDBStore) pos = next_pos } } + if(replay_operations > 0) { + snapshot_index + } } info("Took %.2f second(s) to recover %d operations in the log file.", (log_replay_duration/TimeUnit.SECONDS.toNanos(1).toFloat), replay_operations) @@ -500,6 +510,7 @@ class LevelDBClient(store: LevelDBStore) tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos)) snapshot_index_file(last_index_snapshot_pos).recursive_delete last_index_snapshot_pos = new_snapshot_index_pos + last_index_snapshot_ts = System.currentTimeMillis() } catch { case e: Exception => @@ -888,6 +899,11 @@ class LevelDBClient(store: LevelDBStore) } def gc:Unit = { + + // TODO: + // Perhaps we should snapshot_index if the current snapshot is old. + // + last_index_snapshot_pos val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet