Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F3AB7962D for ; Fri, 17 Feb 2012 23:17:54 +0000 (UTC) Received: (qmail 55593 invoked by uid 500); 17 Feb 2012 23:17:54 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 55559 invoked by uid 500); 17 Feb 2012 23:17:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 55551 invoked by uid 99); 17 Feb 2012 23:17:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 23:17:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 23:17:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id AAB6E23889C5 for ; Fri, 17 Feb 2012 23:17:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245801 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/ apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ Date: Fri, 17 Feb 2012 23:17:30 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120217231730.AAB6E23889C5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 17 23:17:30 2012 New Revision: 1245801 URL: http://svn.apache.org/viewvc?rev=1245801&view=rev Log: Reduce the amount of data encoded into every leveldb store's index queue entry. Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1245801&r1=1245800&r2=1245801&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Feb 17 23:17:30 2012 @@ -111,12 +111,12 @@ abstract class StoreFunSuiteSupport exte def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = { var batch = store.create_uow - var msg_keys = ListBuffer[(Long, AtomicReference[Object])]() + var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]() var next_seq = first_seq messages.foreach { message=> val msgKey = add_message(batch, message) - msg_keys += msgKey + msg_keys +=( (msgKey._1, msgKey._2, next_seq) ) batch.enqueue(entry(queue_key, next_seq, msgKey)) next_seq += 1 } @@ -215,8 +215,8 @@ abstract class StoreFunSuiteSupport exte val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil) val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) ) - expect(msg_keys.toSeq.map(_._1)) { - rc.map( _.message_key ) + expect(msg_keys.toSeq.map(_._3)) { + rc.map( _.entry_seq ) } } Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245801&r1=1245800&r2=1245801&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Feb 17 23:17:30 2012 @@ -387,13 +387,16 @@ class LevelDBClient(store: LevelDBStore) log.read(pos).map { case (kind, data, next_pos) => kind match { - case LOG_ADD_MESSAGE => case LOG_ADD_QUEUE_ENTRY => replay_operations+=1 val record = QueueEntryPB.FACTORY.parseFramed(data) - val pos = decode_vlong(record.getMessageLocator) - index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data) - pos.foreach(log_ref_increment(_)) + + val index_record = record.copy() + index_record.clearQueueKey() + index_record.clearQueueSeq() + index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toFramedBuffer) + + log_ref_increment(decode_vlong(record.getMessageLocator)) case LOG_REMOVE_QUEUE_ENTRY => replay_operations+=1 @@ -436,7 +439,7 @@ class LevelDBClient(store: LevelDBStore) index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray) } case _ => - // Skip unknown records + // Skip records which don't require index updates. } pos = next_pos } @@ -825,18 +828,32 @@ class LevelDBClient(store: LevelDBStore) action.enqueues.foreach { entry => assert(locator!=null) val (pos, len) = locator - entry.message_locator.set(locator) - if ( locator_buffer==null ) { locator_buffer = encode_locator(pos, len) } - val record = PBSupport.to_pb(entry) - record.setMessageLocator(locator_buffer) + entry.message_locator.set(locator) - val encoded = record.freeze().toFramedBuffer - appender.append(LOG_ADD_QUEUE_ENTRY, encoded) - batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded) + val log_record = new QueueEntryPB.Bean + // TODO: perhaps we should normalize the sender to make the index entries more compact. + log_record.setSender(entry.sender) + log_record.setMessageLocator(locator_buffer) + log_record.setQueueKey(entry.queue_key) + log_record.setQueueSeq(entry.entry_seq) + log_record.setSize(entry.size) + if (entry.expiration!=0) + log_record.setExpiration(entry.expiration) + if (entry.redeliveries!=0) + log_record.setRedeliveries(entry.redeliveries) + + appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toFramedBuffer) + + // Slim down the index record, the smaller it is the cheaper the compactions + // will be and the more we can cache in mem. + val index_record = log_record.copy() + index_record.clearQueueKey() + index_record.clearQueueSeq() + batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toFramedBuffer) // Increment it. log_ref_increment(pos, log_info) @@ -998,8 +1015,11 @@ class LevelDBClient(store: LevelDBStore) val start = encode_key(queue_entry_prefix, queue_key, firstSeq) val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1) index.cursor_range( start, end, ro ) { (key, value) => + val (_, _, queue_seq) = decode_long_long_key(key) val record = QueueEntryPB.FACTORY.parseFramed(value) val entry = PBSupport.from_pb(record) + entry.queue_key = queue_key + entry.entry_seq = queue_seq entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator)) rc += entry true @@ -1172,9 +1192,12 @@ class LevelDBClient(store: LevelDBStore) } // Now export the queue entries - index.cursor_prefixed(queue_entry_prefix_array, nocache) { (_, value) => + index.cursor_prefixed(queue_entry_prefix_array, nocache) { (key, value) => + val (_, queue_key, queue_seq) = decode_long_long_key(key) val record = QueueEntryPB.FACTORY.parseFramed(value).copy() val (pos, len) = decode_locator(record.getMessageLocator) + record.setQueueKey(queue_key) + record.setQueueSeq(queue_seq) record.setMessageKey(pos) manager.store_queue_entry(record) true