Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBF01B39D for ; Fri, 20 Jan 2012 18:53:02 +0000 (UTC) Received: (qmail 49227 invoked by uid 500); 20 Jan 2012 18:53:02 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 49179 invoked by uid 500); 20 Jan 2012 18:53:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 49172 invoked by uid 99); 20 Jan 2012 18:53:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2012 18:53:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2012 18:52:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A4C9A2388B3A for ; Fri, 20 Jan 2012 18:52:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1234066 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala RecordLog.scala Date: Fri, 20 Jan 2012 18:52:37 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120120185237.A4C9A2388B3A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Jan 20 18:52:37 2012 New Revision: 1234066 URL: http://svn.apache.org/viewvc?rev=1234066&view=rev Log: If the paranoid_checks option is true, double check the integrity of the index and correct any inconsistencies. Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234066&r1=1234065&r2=1234066&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Jan 20 18:52:37 2012 @@ -20,7 +20,6 @@ import java.{lang=>jl} import java.{util=>ju} import org.fusesource.hawtbuf.proto.PBMessageFactory -import org.apache.activemq.apollo.broker.store.PBSupport._ import org.apache.activemq.apollo.broker.store._ import java.io._ @@ -39,8 +38,9 @@ import collection.mutable.{HashMap, List import org.apache.activemq.apollo.dto.JsonCodec import java.util.Map import org.iq80.leveldb._ -import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._ import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo +import org.apache.activemq.apollo.broker.store.PBSupport._ +import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._ /** * @author Hiram Chirino @@ -251,10 +251,11 @@ class LevelDBClient(store: LevelDBStore) index_options = new Options(); index_options.createIfMissing(true); + val paranoid_checks = config.paranoid_checks.getOrElse(false) config.index_max_open_files.foreach( index_options.maxOpenFiles(_) ) config.index_block_restart_interval.foreach( index_options.blockRestartInterval(_) ) - index_options.paranoidChecks(config.paranoid_checks.getOrElse(false)) + index_options.paranoidChecks(paranoid_checks) Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.writeBufferSize(_) ) Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.blockSize(_) ) Option(config.index_compression).foreach(x => index_options.compressionType( x match { @@ -271,7 +272,7 @@ class LevelDBClient(store: LevelDBStore) log = create_log log.sync = sync log.logSize = log_size - log.paranoidChecks = index_options.paranoidChecks() + log.verify_checksums = verify_checksums log.on_log_rotate = ()=> { // lets queue a request to checkpoint when // the logs rotate.. queue it on the GC thread since GC's lock @@ -327,6 +328,11 @@ class LevelDBClient(store: LevelDBStore) try { load_log_refs index.put(dirty_index_key, TRUE) + + if( paranoid_checks ) { + check_index_integrity(index) + } + // Update the index /w what was stored on the logs.. var pos = last_index_snapshot_pos; @@ -421,6 +427,85 @@ class LevelDBClient(store: LevelDBStore) } } + def check_index_integrity(index:RichDB) = { + val actual_log_refs = HashMap[Long, LongCounter]() + var referenced_queues = Set[Long]() + + // Lets find out what the queue entries are.. + var fixed_records = 0 + index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=> + try { + val (_, queue_key, seq_key) = decode_long_long_key(key) + val record: QueueEntryRecord = value + if (record.getQueueKey != queue_key) { + throw new IOException("key missmatch") + } + if (record.getQueueSeq != seq_key) { + throw new IOException("key missmatch") + } + val pos = decode_locator(record.getMessageLocator)._1 + log.log_info(pos).foreach { + log_info => + actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet() + } + referenced_queues += queue_key + } catch { + case _ => + fixed_records += 1 + // Invalid record. + index.delete(key) + } + true + } + + // Lets cross check the queues. + index.cursor_prefixed(queue_prefix_array) { (key, value)=> + try { + val (_, queue_key) = decode_long_key(key) + val record: QueueRecord = value + if (record.getKey != queue_key) { + throw new IOException("key missmatch") + } + referenced_queues -= queue_key + } catch { + case _ => + fixed_records += 1 + // Invalid record. + index.delete(key) + } + true + } + + referenced_queues.foreach { queue_key=> + // We have queue entries for a queue that does not exist.. + index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key)) { (key, value)=> + + fixed_records += 1 + index.delete(key) + val entry_record:QueueEntryRecord = value + val pos = decode_locator(entry_record.getMessageLocator)._1 + log.log_info(pos).foreach { log_info => + actual_log_refs.get(log_info.position).foreach { counter => + if (counter.decrementAndGet() == 0) { + actual_log_refs.remove(log_info.position) + } + } + } + true + } + } + + if( actual_log_refs != log_refs ) { + log_refs.clear() + log_refs ++= actual_log_refs + } + + if( fixed_records > 0 ) { + warn("Fixed %d invalid index enties in the leveldb store", fixed_records) + } + } + + private def store_log_refs = { index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray) } Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1234066&r1=1234065&r2=1234066&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Fri Jan 20 18:52:37 2012 @@ -72,7 +72,7 @@ case class RecordLog(directory: File, lo var logSize = 1024 * 1024 * 100 var current_appender:LogAppender = _ - var paranoidChecks = false + var verify_checksums = false var sync = false @@ -222,7 +222,7 @@ case class RecordLog(directory: File, lo val offset = (record_position-position).toInt check_read_flush(offset+LOG_HEADER_SIZE+length) - if(paranoidChecks) { + if(verify_checksums) { val record = new Buffer(LOG_HEADER_SIZE+length) @@ -277,7 +277,7 @@ case class RecordLog(directory: File, lo throw new IOException("short record") } - if(paranoidChecks) { + if(verify_checksums) { if( expectedChecksum != checksum(data) ) { throw new IOException("checksum does not match") }