Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DCE099A43 for ; Fri, 17 Feb 2012 23:56:07 +0000 (UTC) Received: (qmail 32653 invoked by uid 500); 17 Feb 2012 23:56:07 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 32630 invoked by uid 500); 17 Feb 2012 23:56:07 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 32623 invoked by uid 99); 17 Feb 2012 23:56:07 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 23:56:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 23:56:06 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 940952388900 for ; Fri, 17 Feb 2012 23:55:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245811 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Date: Fri, 17 Feb 2012 23:55:46 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120217235546.940952388900@eris.apache.org> Author: chirino Date: Fri Feb 17 23:55:46 2012 New Revision: 1245811 URL: http://svn.apache.org/viewvc?rev=1245811&view=rev Log: Switch to Unframed protobuf messages to further reduce index usage size. Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245811&r1=1245810&r2=1245811&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Feb 17 23:55:46 2012 @@ -47,7 +47,7 @@ import org.apache.activemq.apollo.broker object LevelDBClient extends Log { final val STORE_SCHEMA_PREFIX = "leveldb_store:" - final val STORE_SCHEMA_VERSION = 2 + final val STORE_SCHEMA_VERSION = 3 final val queue_prefix = 'q'.toByte final val queue_entry_prefix = 'e'.toByte @@ -389,27 +389,27 @@ class LevelDBClient(store: LevelDBStore) kind match { case LOG_ADD_QUEUE_ENTRY => replay_operations+=1 - val record = QueueEntryPB.FACTORY.parseFramed(data) + val record = QueueEntryPB.FACTORY.parseUnframed(data) val index_record = record.copy() index_record.clearQueueKey() index_record.clearQueueSeq() - index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toFramedBuffer) + index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toUnframedBuffer) log_ref_increment(decode_vlong(record.getMessageLocator)) case LOG_REMOVE_QUEUE_ENTRY => replay_operations+=1 index.get(data, new ReadOptions).foreach { value=> - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val pos = decode_vlong(record.getMessageLocator) pos.foreach(log_ref_decrement(_)) index.delete(data) } - + case LOG_ADD_QUEUE => replay_operations+=1 - val record = QueuePB.FACTORY.parseFramed(data) + val record = QueuePB.FACTORY.parseUnframed(data) index.put(encode_key(queue_prefix, record.getKey), data) case LOG_REMOVE_QUEUE => @@ -424,7 +424,7 @@ class LevelDBClient(store: LevelDBStore) // Figure out what log file that message entry was in so we can, // decrement the log file reference. - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val pos = decode_vlong(record.getMessageLocator) log_ref_decrement(pos) true @@ -432,7 +432,7 @@ class LevelDBClient(store: LevelDBStore) case LOG_MAP_ENTRY => replay_operations+=1 - val entry = MapEntryPB.FACTORY.parseFramed(data) + val entry = MapEntryPB.FACTORY.parseUnframed(data) if (entry.getValue == null) { index.delete(encode_key(map_prefix, entry.getKey)) } else { @@ -467,11 +467,11 @@ class LevelDBClient(store: LevelDBStore) var referenced_queues = Set[Long]() // Lets find out what the queue entries are.. - var fixed_records = 0 + var fixed_records = 0 index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=> try { val (_, queue_key, seq_key) = decode_long_long_key(key) - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val (pos, len) = decode_locator(record.getMessageLocator) if (record.getQueueKey != queue_key) { throw new IOException("key missmatch") @@ -493,12 +493,12 @@ class LevelDBClient(store: LevelDBStore) } true } - + // Lets cross check the queues. index.cursor_prefixed(queue_prefix_array) { (key, value)=> try { val (_, queue_key) = decode_long_key(key) - val record = QueuePB.FACTORY.parseFramed(value) + val record = QueuePB.FACTORY.parseUnframed(value) if (record.getKey != queue_key) { throw new IOException("key missmatch") } @@ -519,7 +519,7 @@ class LevelDBClient(store: LevelDBStore) trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key) fixed_records += 1 index.delete(key) - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val pos = decode_vlong(record.getMessageLocator) log.log_info(pos).foreach { log_info => actual_log_refs.get(log_info.position).foreach { counter => @@ -537,14 +537,14 @@ class LevelDBClient(store: LevelDBStore) log_refs.clear() log_refs ++= actual_log_refs } - + if( fixed_records > 0 ) { warn("Fixed %d invalid index enties in the leveldb store", fixed_records) } } var lock_file:LockFile = _ - + def lock_store = { import OptionSupport._ if (config.fail_if_locked.getOrElse(false)) { @@ -555,11 +555,11 @@ class LevelDBClient(store: LevelDBStore) } } } - + def unlock_store = { lock_file.unlock() } - + private def store_log_refs = { index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray) } @@ -573,7 +573,7 @@ class LevelDBClient(store: LevelDBStore) } } } - + def stop() = { // this blocks until all io completes.. // Suspend also deletes the index. @@ -768,7 +768,7 @@ class LevelDBClient(store: LevelDBStore) // Figure out what log file that message entry was in so we can, // decrement the log file reference. - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val pos = decode_vlong(record.getMessageLocator) log_ref_decrement(pos) true @@ -795,7 +795,7 @@ class LevelDBClient(store: LevelDBStore) entry.setValue(value) batch.put(encode_key(map_prefix, key), value.toByteArray) } - appender.append(LOG_MAP_ENTRY, entry.freeze().toFramedByteArray) + appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray) } uow.actions.foreach { case (msg, action) => @@ -846,18 +846,18 @@ class LevelDBClient(store: LevelDBStore) if (entry.redeliveries!=0) log_record.setRedeliveries(entry.redeliveries) - appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toFramedBuffer) + appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toUnframedBuffer) // Slim down the index record, the smaller it is the cheaper the compactions // will be and the more we can cache in mem. val index_record = log_record.copy() index_record.clearQueueKey() index_record.clearQueueSeq() - batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toFramedBuffer) - + batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toUnframedBuffer) + // Increment it. log_ref_increment(pos, log_info) - + } } if( uow.flush_sync ) { @@ -974,7 +974,7 @@ class LevelDBClient(store: LevelDBStore) group.first_entry_seq = current_key } - val entry = QueueEntryPB.FACTORY.parseFramed(value) + val entry = QueueEntryPB.FACTORY.parseUnframed(value) val pos = decode_vlong(entry.getMessageLocator) group.last_entry_seq = current_key @@ -1016,7 +1016,7 @@ class LevelDBClient(store: LevelDBStore) val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1) index.cursor_range( start, end, ro ) { (key, value) => val (_, _, queue_seq) = decode_long_long_key(key) - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val entry = PBSupport.from_pb(record) entry.queue_key = queue_key entry.entry_seq = queue_seq @@ -1036,7 +1036,7 @@ class LevelDBClient(store: LevelDBStore) index.get(encode_key(map_prefix, key)).map(new Buffer(_)) } } - + def get_prefixed_map_entries(prefix:Buffer):Seq[(Buffer, Buffer)] = { val rc = ListBuffer[(Buffer, Buffer)]() retry_using_index { @@ -1046,7 +1046,7 @@ class LevelDBClient(store: LevelDBStore) } } rc - } + } def get_last_queue_key:Long = { retry_using_index { @@ -1074,12 +1074,12 @@ class LevelDBClient(store: LevelDBStore) } } } - + case class UsageCounter(info:LogInfo) { var count = 0L var size = 0L var first_reference_queue:QueueRecord = _ - + def increment(value:Int) = { count += 1 size += value @@ -1150,19 +1150,19 @@ class LevelDBClient(store: LevelDBStore) val manager = ExportStreamManager(os, 1) retry_using_index { - + // Delete all the tmp keys.. index.cursor_keys_prefixed(Array(tmp_prefix)) { key => index.delete(key) true } - + index.snapshot { snapshot=> val nocache = new ReadOptions nocache.snapshot(snapshot) nocache.verifyChecksums(verify_checksums) nocache.fillCache(false) - + val cache = new ReadOptions nocache.snapshot(snapshot) nocache.verifyChecksums(false) @@ -1171,7 +1171,7 @@ class LevelDBClient(store: LevelDBStore) // Build a temp table of all references messages by the queues // Remember 2 queues could reference the same message. index.cursor_prefixed(queue_entry_prefix_array, cache) { (_, value) => - val record = QueueEntryPB.FACTORY.parseFramed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value) val (pos, len) = decode_locator(record.getMessageLocator) index.put(encode_key(tmp_prefix, pos), encode_vlong(len)) true @@ -1184,7 +1184,7 @@ class LevelDBClient(store: LevelDBStore) val len = decode_vlong(value).toInt log.read(pos, len).foreach { value => // Set the message key to be the position in the log. - val record = MessagePB.FACTORY.parseFramed(value).copy + val record = MessagePB.FACTORY.parseUnframed(value).copy record.setMessageKey(pos) manager.store_message(record) } @@ -1194,7 +1194,7 @@ class LevelDBClient(store: LevelDBStore) // Now export the queue entries index.cursor_prefixed(queue_entry_prefix_array, nocache) { (key, value) => val (_, queue_key, queue_seq) = decode_long_long_key(key) - val record = QueueEntryPB.FACTORY.parseFramed(value).copy() + val record = QueueEntryPB.FACTORY.parseUnframed(value).copy() val (pos, len) = decode_locator(record.getMessageLocator) record.setQueueKey(queue_key) record.setQueueSeq(queue_seq) @@ -1204,7 +1204,7 @@ class LevelDBClient(store: LevelDBStore) } index.cursor_prefixed(queue_prefix_array) { (_, value) => - val record = QueuePB.FACTORY.parseFramed(value) + val record = QueuePB.FACTORY.parseUnframed(value) manager.store_queue(record) true } @@ -1253,7 +1253,7 @@ class LevelDBClient(store: LevelDBStore) while(manager.getNext match { case record:MessagePB.Buffer => - val message_data = record.toFramedBuffer + val message_data = record.toUnframedBuffer val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data) index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length)) true @@ -1265,7 +1265,7 @@ class LevelDBClient(store: LevelDBStore) case Some(locator)=> val (pos, len) = decode_locator(locator) copy.setMessageLocator(locator) - index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toFramedBuffer) + index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer) log.log_info(pos).foreach { log_info => log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet() } @@ -1275,7 +1275,7 @@ class LevelDBClient(store: LevelDBStore) true case record:QueuePB.Buffer => - index.put(encode_key(queue_prefix, record.getKey), record.toFramedBuffer) + index.put(encode_key(queue_prefix, record.getKey), record.toUnframedBuffer) true case record:MapEntryPB.Buffer =>