Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 43087 invoked from network); 7 Jul 2010 04:07:51 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 04:07:51 -0000 Received: (qmail 44755 invoked by uid 500); 7 Jul 2010 04:07:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 44689 invoked by uid 500); 7 Jul 2010 04:07:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 44682 invoked by uid 99); 7 Jul 2010 04:07:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:07:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:07:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2EC3E23889B1; Wed, 7 Jul 2010 04:06:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961128 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ activemq-hawtdb/src/main/proto/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-hawtdb/s... Date: Wed, 07 Jul 2010 04:06:48 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707040648.2EC3E23889B1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 04:06:47 2010 New Revision: 961128 URL: http://svn.apache.org/viewvc?rev=961128&view=rev Log: making more progress on the hawtdb-store Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java Removed: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java?rev=961128&r1=961127&r2=961128&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java Wed Jul 7 04:06:47 2010 @@ -32,8 +32,8 @@ public class HawtDBStoreDTO extends Stor @XmlAttribute(name="directory", required=false) public File directory; - @XmlAttribute(name="checkpoint-interval", required=false) - public long checkpointInterval = 5 * 1000L; + @XmlAttribute(name="index-flush-interval", required=false) + public long indexFlushInterval = 5 * 1000L; @XmlAttribute(name="cleanup-interval", required=false) public long cleanupInterval = 30 * 1000L; Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961128&r1=961127&r2=961128&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul 7 04:06:47 2010 @@ -154,18 +154,27 @@ message RemoveStream { /////////////////////////////////////////////////////////////// // Index Structures /////////////////////////////////////////////////////////////// -message RootRecord { +message DatabaseRootRecord { required fixed32 state=1; required fixed64 lastMessageKey=2; required fixed64 firstInProgressBatch=3; required fixed64 lastUpdateLocation=4; - required fixed32 locationIndexPage=5; + required fixed32 dataFileRefIndexPage=5; required fixed32 messageKeyIndexPage=6; required fixed32 messageRefsIndexPage=7; - required fixed32 destinationIndexPage=8; + required fixed32 queueIndexPage=8; required fixed32 subscriptionIndexPage=10; required fixed32 mapIndexPage=11; -} \ No newline at end of file +} + +message QueueRootRecord { + required AddQueue info=1; + required int64 size=2; + required int64 count=3; + required fixed32 entryIndexPage=4; + required fixed32 trackingIndexPage=5; +} + Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961128&r1=961127&r2=961128&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:06:47 2010 @@ -26,9 +26,6 @@ import org.apache.activemq.apollo.store. import org.fusesource.hawtbuf.proto.MessageBuffer import org.fusesource.hawtbuf.proto.PBMessage import org.apache.activemq.util.LockFile -import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory} -import java.util.HashSet -import collection.mutable.{HashMap, ListBuffer} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location} import org.fusesource.hawtdispatch.TaskTracker @@ -39,84 +36,52 @@ import org.apache.activemq.broker.store. import org.fusesource.hawtbuf._ import org.fusesource.hawtdispatch.ScalaDispatch._ import org.apache.activemq.apollo.broker.{Log, Logging, BaseService} +import collection.mutable.{LinkedHashMap, HashMap, ListBuffer} +import collection.JavaConversions +import java.util.{TreeSet, HashSet} +import org.fusesource.hawtdb.api._ object HawtDBClient extends Log { - - type PB = PBMessage[_ <: PBMessage[_,_], _ <: MessageBuffer[_,_]] - - implicit def toPBMessage(value:TypeCreatable):PB = value.asInstanceOf[PB] - val BEGIN = -1 val COMMIT = -2 + val ROLLBACK = -3 val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000 val CLOSED_STATE = 1 val OPEN_STATE = 2 - - implicit def decodeMessageRecord(pb: AddMessage.Getter): MessageRecord = { - val rc = new MessageRecord - rc.protocol = pb.getProtocol - rc.size = pb.getSize - rc.value = pb.getValue - rc.stream = pb.getStreamKey - rc.expiration = pb.getExpiration - rc - } - - implicit def encodeMessageRecord(v: MessageRecord): AddMessage.Bean = { - val pb = new AddMessage.Bean - pb.setProtocol(v.protocol) - pb.setSize(v.size) - pb.setValue(v.value) - pb.setStreamKey(v.stream) - pb.setExpiration(v.expiration) - pb - } - - implicit def decodeQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = { - val rc = new QueueEntryRecord - rc.messageKey = pb.getMessageKey - rc.attachment = pb.getAttachment - rc.size = pb.getSize - rc.redeliveries = pb.getRedeliveries.toShort - rc - } - - implicit def encodeQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = { - val pb = new AddQueueEntry.Bean - pb.setMessageKey(v.messageKey) - pb.setAttachment(v.attachment) - pb.setSize(v.size) - pb.setRedeliveries(v.redeliveries) - pb - } } /** * * @author Hiram Chirino */ -class HawtDBClient() extends Logging { +class HawtDBClient(hawtDBStore: HawtDBStore) extends Logging { import HawtDBClient._ + import Helpers._ override def log: Log = HawtDBClient - val dispatchQueue = createQueue("hawtdb store") + def dispatchQueue = hawtDBStore.dispatchQueue private val pageFileFactory = new TxPageFileFactory() private var journal: Journal = null private var lockFile: LockFile = null - private var nextRecoveryPosition: Location = null - private var lastRecoveryPosition: Location = null private val trackingGen = new AtomicLong(0) + private val lockedDatatFiles = new HashSet[java.lang.Integer]() - private val journalFilesBeingReplicated = new HashSet[Integer]() private var recovering = false + private var nextRecoveryPosition: Location = null + private var lastRecoveryPosition: Location = null + private var recoveryCounter = 0 - //protected RootEntity rootEntity = new RootEntity() + var databaseRootRecord = new DatabaseRootRecord.Bean + + + val next_batch_counter = new AtomicInteger(0) + private var batches = new LinkedHashMap[Int, (Location, ListBuffer[Update])]() ///////////////////////////////////////////////////////////////////// // @@ -128,7 +93,7 @@ class HawtDBClient() extends Logging { private def journalMaxFileLength = config.journalLogSize - private def checkpointInterval = config.checkpointInterval + private def checkpointInterval = config.indexFlushInterval private def cleanupInterval = config.cleanupInterval @@ -139,7 +104,7 @@ class HawtDBClient() extends Logging { ///////////////////////////////////////////////////////////////////// // - // Public interface + // Public interface used by the HawtDBStore // ///////////////////////////////////////////////////////////////////// @@ -168,12 +133,17 @@ class HawtDBClient() extends Logging { } } + def createJournal() = { + val journal = new Journal() + journal.setDirectory(directory) + journal.setMaxFileLength(config.journalLogSize) + journal + } + def start() = { lock { - journal = new Journal() - journal.setDirectory(directory) - journal.setMaxFileLength(config.journalLogSize) + journal = createJournal() journal.start pageFileFactory.setFile(new File(directory, "db")) @@ -182,14 +152,30 @@ class HawtDBClient() extends Logging { pageFileFactory.setUseWorkerThread(true) pageFileFactory.open() - withTx {tx => - if (!tx.allocator().isAllocated(0)) { - // rootEntity.allocate(tx) - } - // rootEntity.load(tx) + withTx { tx => + val helper = new TxHelper(tx) + import helper._ + + if (!tx.allocator().isAllocated(0)) { + val rootPage = tx.alloc() + assert(rootPage == 0) + + databaseRootRecord.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY)) + databaseRootRecord.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY)) + databaseRootRecord.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY)) + databaseRootRecord.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY)) + databaseRootRecord.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY)) + + tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze) + databaseRootRecord = databaseRootRecord.copy + } else { + databaseRootRecord = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0).copy + } } + pageFile.flush() - // recover() + recover + // trackingGen.set(rootEntity.getLastMessageTracking() + 1) // checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { @@ -227,7 +213,6 @@ class HawtDBClient() extends Logging { def stop() = { } - def addQueue(record: QueueRecord) = { val update = new AddQueue.Bean() update.setKey(record.key) @@ -236,177 +221,160 @@ class HawtDBClient() extends Logging { store(update) } + def store(txs: Seq[HawtDBStore#HawtDBBatch]) { + var batch = List[TypeCreatable]() + txs.foreach { + tx => + tx.actions.foreach { + case (msg, action) => + if (action.store != null) { + val update: AddMessage.Bean = action.store + batch ::= update + } + action.enqueues.foreach { + queueEntry => + val update: AddQueueEntry.Bean = queueEntry + batch ::= update + } + action.dequeues.foreach { + queueEntry => + val qid = queueEntry.queueKey + val seq = queueEntry.queueSeq + batch ::= new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq) + } + } + } + store(batch) + } + def purge() = { -// withSession { -// session => -// session.list(schema.queue_name).map { -// x => -// val qid: Long = x.name -// session.remove(schema.entries \ qid) -// } -// session.remove(schema.queue_name) -// session.remove(schema.message_data) -// } + val update = new Purge.Bean() + store(update) } def listQueues: Seq[Long] = { - null -// withSession { -// session => -// session.list(schema.queue_name).map { -// x => -// val id: Long = x.name -// id -// } -// } - } - - def getQueueStatus(id: Long): Option[QueueStatus] = { - null -// withSession { -// session => -// session.get(schema.queue_name \ id) match { -// case Some(x) => -// -// val rc = new QueueStatus -// rc.record = new QueueRecord -// rc.record.key = id -// rc.record.name = new AsciiBuffer(x.value) -// -// // rc.count = session.count( schema.entries \ id ) -// -// // TODO -// // rc.count = -// // rc.first = -// // rc.last = -// -// Some(rc) -// case None => -// None -// } -// } + withTx { tx => + val helper = new TxHelper(tx) + import JavaConversions._ + import helper._ + queueIndex.iterator.map { + entry => + entry.getKey.longValue + }.toSeq + } } + def getQueueStatus(queueKey: Long): Option[QueueStatus] = { + withTx { tx => + val helper = new TxHelper(tx) + import JavaConversions._ + import helper._ + + val queueRecord = queueIndex.get(queueKey) + if (queueRecord != null) { + val rc = new QueueStatus + rc.record = new QueueRecord + rc.record.key = queueKey + rc.record.name = queueRecord.getInfo.getName + rc.record.queueType = queueRecord.getInfo.getQueueType + rc.count = queueRecord.getCount.toInt + rc.size = queueRecord.getSize + + // TODO + // rc.first = + // rc.last = + + Some(rc) + } else { + None + } + } + } - def store(txs: Seq[HawtDBStore#HawtDBBatch]) { -// withSession { -// session => -// var operations = List[Operation]() -// txs.foreach { -// tx => -// tx.actions.foreach { -// case (msg, action) => -// var rc = -// if (action.store != null) { -// operations ::= Insert( schema.message_data \ (msg, action.store) ) -// } -// action.enqueues.foreach { -// queueEntry => -// val qid = queueEntry.queueKey -// val seq = queueEntry.queueSeq -// operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) ) -// } -// action.dequeues.foreach { -// queueEntry => -// val qid = queueEntry.queueKey -// val seq = queueEntry.queueSeq -// operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) ) -// } -// } -// } -// session.batch(operations) -// } - } - - def loadMessage(id: Long): Option[MessageRecord] = { - null -// withSession { -// session => -// session.get(schema.message_data \ id) match { -// case Some(x) => -// val rc: MessageRecord = x.value -// rc.key = id -// Some(rc) -// case None => -// None -// } -// } - } - - def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = { - null -// withSession { -// session => -// session.list(schema.entries \ qid).map { -// x => -// val rc: QueueEntryRecord = x.value -// rc.queueKey = qid -// rc.queueSeq = x.name -// rc -// } -// } + + def getQueueEntries(queueKey: Long): Seq[QueueEntryRecord] = { + withTx { tx => + val helper = new TxHelper(tx) + import JavaConversions._ + import helper._ + + val queueRecord = queueIndex.get(queueKey) + if (queueRecord != null) { + val entryIndex = queueEntryIndex(queueRecord) + entryIndex.iterator.map { + entry => + val rc: QueueEntryRecord = entry.getValue + rc + }.toSeq + } else { + Nil.toSeq + } + } } + def loadMessage(messageKey: Long): Option[MessageRecord] = { + withTx { tx => + val helper = new TxHelper(tx) + import JavaConversions._ + import helper._ + + val location = messageKeyIndex.get(messageKey) + if (location != null) { + load(location, classOf[AddMessage.Getter]) match { + case Some(x) => + val messageRecord: MessageRecord = x + Some(messageRecord) + case None => None + } + } else { + None + } + } + } + + ///////////////////////////////////////////////////////////////////// // - // Implementation + // Batch/Transactional interface to storing/accessing journaled updates. // ///////////////////////////////////////////////////////////////////// - private def withTx[T](func: (Transaction) => T) { - val tx = pageFile.tx - var ok = false + private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = { try { - val rc = func(tx) - ok = true - rc - } finally { - if (ok) { - tx.commit - } else { - tx.rollback + read(location) match { + case (updateType, data) => + Some(expected.cast(decode(location, updateType, data))) } + } catch { + case e: Exception => + debug("Could not load journal record at: %s", location) + None } } - val next_batch_counter = new AtomicInteger(0) - - // Gets the next batch id.. after a while we may wrap around - // start producing batch ids from zero - val next_batch_id = { - var rc = next_batch_counter.getAndIncrement - while (rc < 0) { - // We just wrapped around.. reset the counter to 0 - // Use a CAS operation so that only 1 thread resets the counter - next_batch_counter.compareAndSet(rc + 1, 0) - rc = next_batch_counter.getAndIncrement - } - rc - } - - - private def store(updates: List[TypeCreatable]):Unit = { + private def store(updates: List[TypeCreatable]): Unit = { val tracker = new TaskTracker("storing") - store( updates, tracker.task(updates)) + store(updates, tracker.task(updates)) tracker.await } - private def store(update: TypeCreatable):Unit = { + private def store(update: TypeCreatable): Unit = { val tracker = new TaskTracker("storing") - store( update, tracker.task(update)) + store(update, tracker.task(update)) tracker.await } - private def store(updates: List[TypeCreatable], onComplete: Runnable):Unit = { + private def store(updates: List[TypeCreatable], onComplete: Runnable): Unit = { val batch = next_batch_id begin(batch) - updates.foreach {update => - store(batch, update, null) + updates.foreach { + update => + store(batch, update, null) } commit(batch, onComplete) } - private def store(update: TypeCreatable, onComplete: Runnable):Unit = store(-1, update, onComplete) + private def store(update: TypeCreatable, onComplete: Runnable): Unit = store(-1, update, onComplete) /** * All updated are are funneled through this method. The updates are logged to @@ -415,7 +383,7 @@ class HawtDBClient() extends Logging { * * @throws IOException */ - private def store(batch: Int, update: TypeCreatable, onComplete: Runnable):Unit = { + private def store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = { val kind = update.asInstanceOf[TypeCreatable] val frozen = update.freeze val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1) @@ -423,191 +391,615 @@ class HawtDBClient() extends Logging { baos.writeInt(batch) frozen.writeUnframed(baos) - journal(baos.toBuffer()) {location => - store(batch, update, onComplete, location) + append(baos.toBuffer()) { + location => + executeStore(batch, update, onComplete, location) } } - /** */ - private def begin(batch: Int):Unit = { + private def begin(batch: Int): Unit = { val baos = new DataByteArrayOutputStream(5) baos.writeByte(BEGIN) baos.writeInt(batch) - journal(baos.toBuffer) {location => - begin(batch, location) + append(baos.toBuffer) { + location => + executeBegin(batch, location) } } /** */ - private def commit(batch: Int, onComplete: Runnable):Unit = { + private def commit(batch: Int, onComplete: Runnable): Unit = { val baos = new DataByteArrayOutputStream(5) baos.writeByte(COMMIT) baos.writeInt(batch) - journal(baos.toBuffer) {location => - commit(batch, onComplete, location) + append(baos.toBuffer) { + location => + executeCommit(batch, onComplete, location) } } - private def journal(data: Buffer)(cb: (Location) => Unit):Unit = { - val start = System.currentTimeMillis() - try { - journal.write(data, new JournalCallback() { - def success(location: Location) = { - cb(location) - } - }) - } finally { - val end = System.currentTimeMillis() - if (end - start > 1000) { - warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms") - } + private def rollback(batch: Int, onComplete: Runnable): Unit = { + val baos = new DataByteArrayOutputStream(5) + baos.writeByte(ROLLBACK) + baos.writeInt(batch) + append(baos.toBuffer) { + location => + executeRollback(batch, onComplete, location) } } + ///////////////////////////////////////////////////////////////////// + // + // Methods related to recovery + // + ///////////////////////////////////////////////////////////////////// /** - * Move all the messages that were in the journal into long term storage. We - * just replay and do a checkpoint. + * Move all the messages that were in the journal into the indexes. * * @throws IOException * @throws IOException * @throws IllegalStateException */ - def recover = { + def recover: Unit = { + recoveryCounter = 0 + lastRecoveryPosition = null + val start = System.currentTimeMillis() + incrementalRecover + + store(new AddTrace.Bean().setMessage("RECOVERED"), ^ { + // Rollback any batches that did not complete. + batches.keysIterator.foreach { + batch => + rollback(batch, null) + } + }) + + val end = System.currentTimeMillis() + info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end - start) / 1000.0f)) + } + + + /** + * incrementally recovers the journal. It can be run again and again + * if the journal is being appended to. + */ + def incrementalRecover(): Unit = { + + // Is this our first incremental recovery pass? + if (lastRecoveryPosition == null) { + if (databaseRootRecord.hasFirstInProgressBatch) { + // we have to start at the first in progress batch usually... + nextRecoveryPosition = databaseRootRecord.getFirstInProgressBatch + } else { + // but perhaps there were no batches in progress.. + if (databaseRootRecord.hasLastUpdateLocation) { + // then we can just continue from the last update applied to the index + nextRecoveryPosition = journal.getNextLocation(databaseRootRecord.getLastUpdateLocation) + } else { + // no updates in the index?.. start from the first record in the journal. + nextRecoveryPosition = journal.getNextLocation(null) + } + } + } else { + nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition) + } + try { - val start = System.currentTimeMillis() recovering = true - var location = getRecoveryPosition() - if (location != null) { - var counter = 0 - var uow: Transaction = null - val uowCounter = 0 - while (location != null) { - import BufferEditor.BIG_ENDIAN._ - - var data = journal.read(location) - val updateType = readByte(data) - val batch = readInt(data) - updateType match { - case BEGIN => begin(batch, location) - case COMMIT => commit(batch, null, location) - case _ => - val update = decode(location, updateType, data) - store(batch, update, null, location) - } - counter += 1 - location = journal.getNextLocation(location) - } - val end = System.currentTimeMillis() - info("Processed %d operations from the journal in %,.3f seconds.", counter, ((end - start) / 1000.0f)) + // Continue recovering until journal runs out of records. + while (nextRecoveryPosition != null) { + lastRecoveryPosition = nextRecoveryPosition + recover(lastRecoveryPosition) + nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition) } - // We may have to undo some index updates. -// withTx {tx => -// recoverIndex(tx) -// } } finally { recovering = false } } - def decode(location:Location, updateType:Int, value:Buffer) = { - val t = Type.valueOf(updateType); - if (t == null) { - throw new IOException("Could not load journal record. Invalid type at location: " + location); - } - t.parseUnframed(value).asInstanceOf[TypeCreatable] + /** + * Recovers the logged record at the specified location. + */ + def recover(location: Location): Unit = { + var data = journal.read(location) + + val editor = data.bigEndianEditor + val updateType = editor.readByte() + val batch = editor.readInt() + + updateType match { + case BEGIN => executeBegin(batch, location) + case COMMIT => executeCommit(batch, null, location) + case _ => + val update = decode(location, updateType, data) + executeStore(batch, update, null, location) + } + + recoveryCounter += 1 + databaseRootRecord.setLastUpdateLocation(location) } -// def incrementalRecover() = { -// try { -// recovering = true -// if (nextRecoveryPosition == null) { -// if (lastRecoveryPosition == null) { -// nextRecoveryPosition = getRecoveryPosition() -// } else { -// nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition) -// } -// } -// while (nextRecoveryPosition != null) { -// lastRecoveryPosition = nextRecoveryPosition -// rootEntity.setLastUpdate(lastRecoveryPosition) -// val message = load(lastRecoveryPosition) -// val location = lastRecoveryPosition -// -// withTx {tx => -// updateIndex(tx, message.toType(), (MessageBuffer) message, location) -// } -// nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition) -// } -// } finally { -// recovering = false -// } -// } - - - def getRecoveryPosition(): Location = { -// if (rootEntity.getLastUpdate() != null) { -// // Start replay at the record after the last one recorded in the -// // index file. -// return journal.getNextLocation(rootEntity.getLastUpdate()); -// } + ///////////////////////////////////////////////////////////////////// + // + // Methods for Journal access + // + ///////////////////////////////////////////////////////////////////// - // This loads the first position. - return journal.getNextLocation(null); + private def append(data: Buffer)(cb: (Location) => Unit): Unit = { + val start = System.currentTimeMillis() + try { + journal.write(data, new JournalCallback() { + def success(location: Location) = { + cb(location) + } + }) + } finally { + val end = System.currentTimeMillis() + if (end - start > 1000) { + warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms") + } + } } + def read(location: Location) = { + var data = journal.read(location) + val editor = data.bigEndianEditor + val updateType = editor.readByte() + (updateType, data) + } - private var batches = new HashMap[Int, ListBuffer[Update]]() - private case class Update(update: TypeCreatable, location: Location) + ///////////////////////////////////////////////////////////////////// + // + // Methods that execute updates stored in the journal by indexing them + // Used both in normal operation and durring recovery. + // + ///////////////////////////////////////////////////////////////////// + + private def executeBegin(batch: Int, location: Location): Unit = { + assert(batches.get(batch).isEmpty) + batches.put(batch, (location, ListBuffer())) + } + + private def executeCommit(batch: Int, onComplete: Runnable, location: Location): Unit = { + // apply all the updates in the batch as a single unit of work. + batches.remove(batch) match { + case Some((_, updates)) => + // When recovering.. we only want to redo updates that committed + // after the last update location. + if (!recovering || isAfterLastUpdateLocation(location)) { + withTx { tx => + // index the updates + updates.foreach { + update => + index(tx, update.update, update.location) + } + updateLocations(tx, location) + } + } + case None => + // when recovering.. we are more lax due recovery starting + // in the middle of a stream of in progress batches + assert(!recovering) + } + if (onComplete != null) { + onComplete.run + } + } + + private def executeRollback(batch: Int, onComplete: Runnable, location: Location): Unit = { + // apply all the updates in the batch as a single unit of work. + batches.remove(batch) match { + case Some((_, _)) => + if (!recovering || isAfterLastUpdateLocation(location)) { + withTx { tx => + updateLocations(tx, location) + } + } + case None => + // when recovering.. we are more lax due recovery starting + // in the middle of a stream of in progress batches + assert(!recovering) + } + if (onComplete != null) { + onComplete.run + } + } - private def store(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = { + private def executeStore(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = { if (batch == -1) { - // update is not part of the batch.. apply it now. - withTx {tx => - store(tx, update, location) + // update is not part of the batch.. + + // When recovering.. we only want to redo updates that happen + // after the last update location. + if (!recovering || isAfterLastUpdateLocation(location)) { + withTx { tx => + // index the update now. + index(tx, update, location) + updateLocations(tx, location) + } } + if (onComplete != null) { onComplete.run } } else { + + // only the commit/rollback in batch can have an onCompelte handler + assert(onComplete == null) + // if the update was part of a batch don't apply till the batch is committed. batches.get(batch) match { - case Some(updates)=> updates += Update(update, location) + case Some((_, updates)) => + updates += Update(update, location) case None => + // when recovering.. we are more lax due recovery starting + // in the middle of a stream of in progress batches + assert(!recovering) } } } - private def begin(batch: Int, location: Location): Unit = { - assert( batches.get(batch).isEmpty ) - batches.put(batch, ListBuffer()) + + private def index(tx: Transaction, update: TypeCreatable, location: Location): Unit = { + + object Process extends TxHelper(tx) { + import JavaConversions._ + + def apply(x: AddMessage.Getter): Unit = { + + val key = x.getMessageKey() + if (key > databaseRootRecord.getLastMessageKey) { + databaseRootRecord.setLastMessageKey(key) + } + + val prevLocation = messageKeyIndex.put(key, location) + if (prevLocation != null) { + // Message existed.. undo the index update we just did. Chances + // are it's a transaction replay. + messageKeyIndex.put(key, prevLocation) + if (location == prevLocation) { + warn("Message replay detected for: %d", key) + } else { + error("Message replay with different location for: %d", key) + } + } else { + val fileId:java.lang.Integer = location.getDataFileId() + addAndGet(dataFileRefIndex, fileId, 1) + } + } + + def removeMessage(key:Long) = { + val location = messageKeyIndex.remove(key) + if (location != null) { + val fileId:java.lang.Integer = location.getDataFileId() + addAndGet(dataFileRefIndex, fileId, -1) + } else { + error("Cannot remove message, it did not exist: %d", key) + } + } + + def apply(x: AddQueue.Getter): Unit = { + if (queueIndex.get(x.getKey) == null) { + val queueRecord = new QueueRootRecord.Bean + queueRecord.setEntryIndexPage(alloc(QUEUE_ENTRY_INDEX_FACTORY)) + queueRecord.setTrackingIndexPage(alloc(QUEUE_TRACKING_INDEX_FACTORY)) + queueRecord.setInfo(x) + queueIndex.put(x.getKey, queueRecord.freeze) + } + } + + def apply(x: RemoveQueue.Getter): Unit = { + val queueRecord = queueIndex.remove(x.getKey) + if (queueRecord != null) { + queueEntryIndex(queueRecord).destroy + queueTrackingIndex(queueRecord).destroy + } + } + + def apply(x: AddQueueEntry.Getter): Unit = { + val queueKey = x.getQueueKey + val queueRecord = queueIndex.get(queueKey) + if (queueRecord != null) { + val trackingIndex = queueTrackingIndex(queueRecord) + val entryIndex = queueEntryIndex(queueRecord) + + // a message can only appear once in a queue (for now).. perhaps we should + // relax this constraint. + val messageKey = x.getMessageKey + val queueSeq = x.getQueueSeq + + val existing = trackingIndex.put(messageKey, queueSeq) + if (existing == null) { + val previous = entryIndex.put(queueSeq, x.freeze) + if (previous == null) { + + val queueRecordUpdate = queueRecord.copy + queueRecordUpdate.setCount(queueRecord.getCount + 1) + queueRecordUpdate.setSize(queueRecord.getSize + x.getSize) + queueIndex.put(queueKey, queueRecordUpdate.freeze) + + addAndGet(messageRefsIndex, new java.lang.Long(messageKey), 1) + } else { + error("Duplicate queue entry seq %d", x.getQueueSeq) + } + } else { + error("Duplicate queue entry message %d", x.getMessageKey) + } + } else { + error("Queue not found: %d", x.getQueueKey) + } + } + + def apply(x: RemoveQueueEntry.Getter): Unit = { + val queueKey = x.getQueueKey + val queueRecord = queueIndex.get(queueKey) + if (queueRecord != null) { + val trackingIndex = queueTrackingIndex(queueRecord) + val entryIndex = queueEntryIndex(queueRecord) + + val queueSeq = x.getQueueSeq + val queueEntry = entryIndex.remove(queueSeq) + if (queueEntry != null) { + val messageKey = queueEntry.getMessageKey + val existing = trackingIndex.remove(messageKey) + if (existing == null) { + error("Tracking entry not found for message %d", queueEntry.getMessageKey) + } + if( addAndGet(messageRefsIndex, new java.lang.Long(messageKey), -1) == 0 ) { + // message is no long referenced.. we can remove it.. + removeMessage(messageKey) + } + } else { + error("Queue entry not found for seq %d", x.getQueueSeq) + } + } else { + error("Queue not found: %d", x.getQueueKey) + } + } + + def apply(x: Purge.Getter): Unit = { + + // Remove all the queues... + queueIndex.iterator.map { + entry => + entry.getKey + }.foreach { + key => + apply(new RemoveQueue.Bean().setKey(key.intValue)) + } + + // Remove stored messages... + messageKeyIndex.clear + messageRefsIndex.clear + dataFileRefIndex.clear + databaseRootRecord.setLastMessageKey(0) + + cleanup(tx); + info("Store purged."); + } + + def apply(x: AddTrace.Getter): Unit = { + // trace messages are informational messages in the journal used to log + // historical info about store state. They don't update the indexes. + } + } + + update match { + case x: AddMessage.Getter => Process(x) + case x: AddQueueEntry.Getter => Process(x) + case x: RemoveQueueEntry.Getter => Process(x) + + case x: AddQueue.Getter => Process(x) + case x: RemoveQueue.Getter => Process(x) + + case x: AddTrace.Getter => Process(x) + case x: Purge.Getter => Process(x) + + case x: AddSubscription.Getter => + case x: RemoveSubscription.Getter => + + case x: AddMap.Getter => + case x: RemoveMap.Getter => + case x: PutMapEntry.Getter => + case x: RemoveMapEntry.Getter => + + case x: OpenStream.Getter => + case x: WriteStream.Getter => + case x: CloseStream.Getter => + case x: RemoveStream.Getter => + } } - private def commit(batch: Int, onComplete: Runnable, location: Location): Unit = { - // apply all the updates in the batch as a single unit of work. - withTx {tx => - batches.get(batch) match { - case Some(updates) => - updates.foreach {update => - store(tx, update.update, update.location) + + ///////////////////////////////////////////////////////////////////// + // + // Periodic Maintance + // + ///////////////////////////////////////////////////////////////////// + + def schedualFlush(): Unit = { + def try_flush() = { + if (hawtDBStore.serviceState.isStarted) { + hawtDBStore.executor_pool { + flush + schedualFlush + } + } + } + dispatchQueue.dispatchAfter(config.indexFlushInterval, TimeUnit.MILLISECONDS, ^ {try_flush}) + } + + def flush() = { + val start = System.currentTimeMillis() + pageFile.flush + val end = System.currentTimeMillis() + if (end - start > 1000) { + warn("Index flush took %,.3f seconds" + ((end - start) / 1000.0f)) + } + } + + def schedualCleanup(): Unit = { + def try_cleanup() = { + if (hawtDBStore.serviceState.isStarted) { + hawtDBStore.executor_pool { + withTx {tx => + cleanup(tx) } - if (onComplete != null) { - onComplete.run + schedualCleanup + } + } + } + dispatchQueue.dispatchAfter(config.cleanupInterval, TimeUnit.MILLISECONDS, ^ {try_cleanup}) + } + + /** + * @param tx + * @throws IOException + */ + def cleanup(tx:Transaction) = { + val helper = new TxHelper(tx) + import JavaConversions._ + import helper._ + + debug("Cleanup started.") + val gcCandidateSet = new TreeSet[Integer](journal.getFileMap().keySet()) + + // Don't cleanup locked data files + if (lockedDatatFiles != null) { + gcCandidateSet.removeAll(lockedDatatFiles) + } + + // Don't GC files that we will need for recovery.. + val upto = if (databaseRootRecord.hasFirstInProgressBatch) { + Some(databaseRootRecord.getFirstInProgressBatch.getDataFileId) + } else { + if (databaseRootRecord.hasLastUpdateLocation) { + Some(databaseRootRecord.getLastUpdateLocation.getDataFileId) + } else { + None + } + } + + upto match { + case Some(dataFile) => + var done = false + while (!done && !gcCandidateSet.isEmpty()) { + val last = gcCandidateSet.last() + if (last.intValue >= dataFile) { + gcCandidateSet.remove(last) + } else { + done = true } - case None => + } + + case None => + } + + if (!gcCandidateSet.isEmpty() ) { + dataFileRefIndex.iterator.foreach { entry => + gcCandidateSet.remove(entry.getKey) + } + if (!gcCandidateSet.isEmpty()) { + debug("Cleanup removing the data files: %s", gcCandidateSet) + journal.removeDataFiles(gcCandidateSet) + } + } + debug("Cleanup done.") + } + + ///////////////////////////////////////////////////////////////////// + // + // Helper Methods / Classes + // + ///////////////////////////////////////////////////////////////////// + + private case class Update(update: TypeCreatable, location: Location) + + private class TxHelper(private val _tx: Transaction) { + lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage) + lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage) + lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage) + lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage) + lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage) + + def addAndGet[K](index:SortedIndex[K, Integer], key:K, amount:Int):Int = { + var counter = index.get(key) + if( counter == null ) { + if( amount!=0 ) { + index.put(key, amount) + } + amount + } else { + val update = counter.intValue + amount + if( update == 0 ) { + index.remove(key) + } else { + index.put(key, update) + } + update } } + + def queueEntryIndex(root: QueueRootRecord.Getter) = QUEUE_ENTRY_INDEX_FACTORY.open(_tx, root.getEntryIndexPage) + + def queueTrackingIndex(root: QueueRootRecord.Getter) = QUEUE_TRACKING_INDEX_FACTORY.open(_tx, root.getTrackingIndexPage) + + def alloc(factory: IndexFactory[_, _]) = { + val rc = _tx.alloc + factory.create(_tx, rc) + rc + } } - private def store(tx: Transaction, update: TypeCreatable, location: Location): Unit = { + private def withTx[T](func: (Transaction) => T): T = { + val tx = pageFile.tx + var ok = false + try { + val rc = func(tx) + ok = true + rc + } finally { + if (ok) { + tx.commit + } else { + tx.rollback + } + } + } + // Gets the next batch id.. after a while we may wrap around + // start producing batch ids from zero + val next_batch_id = { + var rc = next_batch_counter.getAndIncrement + while (rc < 0) { + // We just wrapped around.. reset the counter to 0 + // Use a CAS operation so that only 1 thread resets the counter + next_batch_counter.compareAndSet(rc + 1, 0) + rc = next_batch_counter.getAndIncrement + } + rc } + private def isAfterLastUpdateLocation(location: Location) = { + val lastUpdate: Location = databaseRootRecord.getLastUpdateLocation + lastUpdate.compareTo(location) < 0 + } + private def updateLocations(tx: Transaction, location: Location): Unit = { + databaseRootRecord.setLastUpdateLocation(location) + if (batches.isEmpty) { + databaseRootRecord.clearFirstInProgressBatch + } else { + databaseRootRecord.setFirstInProgressBatch(batches.head._2._1) + } + tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze) + databaseRootRecord = databaseRootRecord.copy + } } \ No newline at end of file Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961128&r1=961127&r2=961128&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:06:47 2010 @@ -74,9 +74,9 @@ class HawtDBStore extends Store with Bas var next_queue_key = new AtomicLong(0) var next_msg_key = new AtomicLong(0) - protected var executor_pool:ExecutorService = _ + var executor_pool:ExecutorService = _ var config:HawtDBStoreDTO = defaultConfig - val client = new HawtDBClient + val client = new HawtDBClient(this) def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter) Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961128&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul 7 04:06:47 2010 @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store.hawtdb + +import model._ +import model.Type.TypeCreatable +import org.apache.activemq.apollo.store.{MessageRecord, QueueRecord, QueueEntryRecord} +import org.fusesource.hawtbuf.codec._ +import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer} +import java.io.{IOException, DataInput, DataOutput} +import org.fusesource.hawtdb.internal.journal.{LocationCodec, Location} +import org.fusesource.hawtdb.api._ +import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessage} + +/** + *

+ *

+ * + * @author Hiram Chirino + */ +object Helpers { + + val QUEUE_RECORD_CODEC = new VariableCodec[QueueEntryRecord]() { + def decode(dataIn: DataInput): QueueEntryRecord = { + val rc = new QueueEntryRecord(); + rc.queueKey = dataIn.readLong(); + rc.messageKey = dataIn.readLong(); + rc.size = dataIn.readInt(); + // if (dataIn.readBoolean()) { + // rc.setTte(dataIn.readLong()); + // } + rc.redeliveries = dataIn.readShort(); + if (dataIn.readBoolean()) { + rc.attachment = BUFFER_CODEC.decode(dataIn); + } + return rc; + } + + def encode(value: QueueEntryRecord, dataOut: DataOutput) = { + dataOut.writeLong(value.queueKey); + dataOut.writeLong(value.messageKey); + dataOut.writeInt(value.size); + // if (value.getTte() >= 0) { + // dataOut.writeBoolean(true); + // dataOut.writeLong(value.getTte()); + // } else { + // dataOut.writeBoolean(false); + // } + dataOut.writeShort(value.redeliveries); + if (value.attachment != null) { + dataOut.writeBoolean(true); + BUFFER_CODEC.encode(value.attachment, dataOut); + } else { + dataOut.writeBoolean(false); + } + } + + def estimatedSize(value: QueueEntryRecord) = throw new UnsupportedOperationException() + } + + val QUEUE_DESCRIPTOR_CODEC = new VariableCodec[QueueRecord]() { + def decode(dataIn: DataInput): QueueRecord = { + val record = new QueueRecord(); + record.queueType = ASCII_BUFFER_CODEC.decode(dataIn); + record.name = ASCII_BUFFER_CODEC.decode(dataIn); + // if (dataIn.readBoolean()) { + // record.parent = ASCII_BUFFER_MARSHALLER.readPayload(dataIn) + // record.setPartitionId(dataIn.readInt()); + // } + return record; + } + + def encode(value: QueueRecord, dataOut: DataOutput) = { + ASCII_BUFFER_CODEC.encode(value.queueType, dataOut); + ASCII_BUFFER_CODEC.encode(value.name, dataOut); + // if (value.parent != null) { + // dataOut.writeBoolean(true); + // ASCII_BUFFER_MARSHALLER.writePayload(value.parent, dataOut); + // dataOut.writeInt(value.getPartitionKey()); + // } else { + // dataOut.writeBoolean(false); + // } + } + + def estimatedSize(value: QueueRecord) = throw new UnsupportedOperationException() + }; + + val ASCII_BUFFER_CODEC = AsciiBufferCodec.INSTANCE; + val BUFFER_CODEC = BufferCodec.INSTANCE; + + + implicit def toMessageRecord(pb: AddMessage.Getter): MessageRecord = { + val rc = new MessageRecord + rc.protocol = pb.getProtocol + rc.size = pb.getSize + rc.value = pb.getValue + rc.stream = pb.getStreamKey + rc.expiration = pb.getExpiration + rc + } + + implicit def fromMessageRecord(v: MessageRecord): AddMessage.Bean = { + val pb = new AddMessage.Bean + pb.setProtocol(v.protocol) + pb.setSize(v.size) + pb.setValue(v.value) + pb.setStreamKey(v.stream) + pb.setExpiration(v.expiration) + pb + } + + implicit def toQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = { + val rc = new QueueEntryRecord + rc.queueKey = pb.getQueueKey + rc.queueSeq = pb.getQueueSeq + rc.messageKey = pb.getMessageKey + rc.attachment = pb.getAttachment + rc.size = pb.getSize + rc.redeliveries = pb.getRedeliveries.toShort + rc + } + + implicit def fromQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = { + val pb = new AddQueueEntry.Bean + pb.setQueueKey(v.queueKey) + pb.setQueueSeq(v.queueSeq) + pb.setMessageKey(v.messageKey) + pb.setAttachment(v.attachment) + pb.setSize(v.size) + pb.setRedeliveries(v.redeliveries) + pb + } + + implicit def toLocation(value: Long): Location = { + val temp = new Buffer(8) + val editor = temp.bigEndianEditor + editor.writeLong(value) + temp.reset + new Location(editor.readInt(), editor.readInt()) + } + + implicit def fromLocation(value: Location):Long = { + val temp = new Buffer(8) + val editor = temp.bigEndianEditor + editor.writeInt(value.getDataFileId) + editor.writeInt(value.getOffset) + temp.reset + editor.readLong + } + + implicit def toAsciiBuffer(value:String):AsciiBuffer = new AsciiBuffer(value) + implicit def toUTF8Buffer(value:String):UTF8Buffer = new UTF8Buffer(value) + + type PB = PBMessage[_ <: PBMessage[_, _], _ <: MessageBuffer[_, _]] + implicit def toPBMessage(value: TypeCreatable): PB = value.asInstanceOf[PB] + + + val DATABASE_ROOT_RECORD_ACCESSOR = new CodecPagedAccessor[DatabaseRootRecord.Buffer](DatabaseRootRecord.FRAMED_CODEC); + + def decode(location: Location, updateType: Int, value: Buffer) = { + val t = Type.valueOf(updateType); + if (t == null) { + throw new IOException("Could not load journal record. Invalid type at location: " + location); + } + t.parseUnframed(value).asInstanceOf[TypeCreatable] + } + + // + // Index factories... + // + + import java.{lang => jl} + + // maps message key -> Journal Location + val MESSAGE_KEY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, Location](); + MESSAGE_KEY_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE); + MESSAGE_KEY_INDEX_FACTORY.setValueCodec(LocationCodec.INSTANCE); + MESSAGE_KEY_INDEX_FACTORY.setDeferredEncoding(true); + + // maps Journal Data File Id -> Ref Counter + val DATA_FILE_REF_INDEX_FACTORY = new BTreeIndexFactory[jl.Integer, jl.Integer](); + DATA_FILE_REF_INDEX_FACTORY.setKeyCodec(VarIntegerCodec.INSTANCE); + DATA_FILE_REF_INDEX_FACTORY.setValueCodec(VarIntegerCodec.INSTANCE); + DATA_FILE_REF_INDEX_FACTORY.setDeferredEncoding(true); + + // maps message key -> Ref Counter + val MESSAGE_REFS_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, jl.Integer](); + MESSAGE_REFS_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE); + MESSAGE_REFS_INDEX_FACTORY.setValueCodec(VarIntegerCodec.INSTANCE); + MESSAGE_REFS_INDEX_FACTORY.setDeferredEncoding(true); + + // maps queue key -> QueueRootRecord + val QUEUE_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, QueueRootRecord.Buffer](); + QUEUE_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE); + QUEUE_INDEX_FACTORY.setValueCodec(QueueRootRecord.FRAMED_CODEC); + QUEUE_INDEX_FACTORY.setDeferredEncoding(true); + + // maps queue seq -> AddQueueEntry + val QUEUE_ENTRY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddQueueEntry.Buffer](); + QUEUE_ENTRY_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE); + QUEUE_ENTRY_INDEX_FACTORY.setValueCodec(AddQueueEntry.FRAMED_CODEC); + QUEUE_ENTRY_INDEX_FACTORY.setDeferredEncoding(true); + + // maps message key -> queue seq + val QUEUE_TRACKING_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, jl.Long](); + QUEUE_TRACKING_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE); + QUEUE_TRACKING_INDEX_FACTORY.setValueCodec(VarLongCodec.INSTANCE); + QUEUE_TRACKING_INDEX_FACTORY.setDeferredEncoding(true); + + val SUBSCRIPTIONS_INDEX_FACTORY = new BTreeIndexFactory[AsciiBuffer, AddSubscription.Buffer](); + SUBSCRIPTIONS_INDEX_FACTORY.setKeyCodec(AsciiBufferCodec.INSTANCE); + SUBSCRIPTIONS_INDEX_FACTORY.setValueCodec(AddSubscription.FRAMED_CODEC); + SUBSCRIPTIONS_INDEX_FACTORY.setDeferredEncoding(true); + +} \ No newline at end of file Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala?rev=961128&r1=961127&r2=961128&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala Wed Jul 7 04:06:47 2010 @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.store.hawtdb -import model.RootRecord import org.fusesource.hawtbuf.{Buffer, AsciiBuffer} import org.fusesource.hawtdb.api._ import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory, PBMessage} @@ -37,37 +36,7 @@ class DestinationEntity { //} object RootEntity { -// val messageKeyIndexFactory = new BTreeIndexFactory[Long, Long](); -// val locationIndexFactory = new BTreeIndexFactory[Integer, Long](); -// val messageRefsIndexFactory = new BTreeIndexFactory[Long, Long](); -// val destinationIndexFactory = new BTreeIndexFactory[Long, DestinationEntity](); -// val subscriptionIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer](); -// val mapIndexFactory = new BTreeIndexFactory[AsciiBuffer, Integer](); -// val mapInstanceIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer](); -// -// messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE); -// messageKeyIndexFactory.setValueCodec(LongCodec.INSTANCE); -// messageKeyIndexFactory.setDeferredEncoding(true); -// -// locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE); -// locationIndexFactory.setValueCodec(LongCodec.INSTANCE); -// locationIndexFactory.setDeferredEncoding(true); -// -// messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE); -// messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE); -// messageRefsIndexFactory.setDeferredEncoding(true); -// -// destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE); -// destinationIndexFactory.setValueCodec(DestinationEntity.MARSHALLER); -// destinationIndexFactory.setDeferredEncoding(true); -// -// subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC); -// subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC); -// subscriptionIndexFactory.setDeferredEncoding(true); -// -// mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC); -// mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE); -// mapIndexFactory.setDeferredEncoding(true); + // // val DATA_ENCODER_DECODER = PBEncoderDecoder(RootRecord.FACTORY) Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java?rev=961128&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java Wed Jul 7 04:06:47 2010 @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store.hawtdb; + +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.activemq.apollo.store.MessageRecord; +import org.apache.activemq.apollo.store.QueueRecord; +import org.apache.activemq.apollo.store.QueueStatus; +import org.apache.activemq.apollo.store.QueueEntryRecord; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.metric.Period; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtbuf.Buffer; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class HawtDBManagerBenchmark extends TestCase { + +// private static int PERFORMANCE_SAMPLES = 50; +// private static boolean SYNC_TO_DISK = true; +// private static final boolean USE_SHARED_WRITER = true; +// +// private HawtDBManager store; +// private QueueRecord queueId; +// private AtomicLong queueKey = new AtomicLong(0); +// +// protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); +// protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items"); +// +// protected ArrayList consumers = new ArrayList(); +// protected ArrayList producers = new ArrayList(); +// +// protected HawtDBManager createStore() { +// HawtDBManager rc = new HawtDBManager(); +// rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance")); +// rc.setDeleteAllMessages(true); +// return rc; +// } +// +// private SharedWriter writer = null; +// +// private Semaphore enqueuePermits; +// private Semaphore dequeuePermits; +// +// @Override +// protected void setUp() throws Exception { +// store = createStore(); +// //store.setDeleteAllMessages(false); +// store.start(); +// +// if (USE_SHARED_WRITER) { +// writer = new SharedWriter(); +// writer.start(); +// } +// +// enqueuePermits = new Semaphore(20000000); +// dequeuePermits = new Semaphore(0); +// +// queueId = new QueueRecord(); +// queueId.name = new AsciiBuffer("test"); +// store.execute(new VoidCallback() { +// @Override +// public void run(HawtDBSession session) throws Exception { +// session.queueAdd(queueId); +// } +// }, null); +// +// store.execute(new VoidCallback() { +// @Override +// public void run(HawtDBSession session) throws Exception { +// Iterator qqrs = session.queueList(queueId, 1); +// Assert.assertTrue(qqrs.hasNext()); +// QueueStatus qqr = qqrs.next(); +// if(qqr.size > 0) +// { +// queueKey.set(qqr.last + 1); +// System.out.println("Recovered queue: " + qqr.record.name + " with " + qqr.count + " messages"); +// } +// } +// }, null); +// } +// +// @Override +// protected void tearDown() throws Exception { +// for (Consumer c : consumers) { +// c.stop(); +// } +// consumers.clear(); +// for (Producer p : producers) { +// p.stop(); +// } +// producers.clear(); +// +// if (writer != null) { +// writer.stop(); +// } +// +// if (store != null) { +// store.stop(); +// } +// } +// +// class SharedWriter implements Runnable { +// LinkedBlockingQueue queue = new LinkedBlockingQueue(1000); +// private Thread thread; +// private AtomicBoolean stopped = new AtomicBoolean(); +// +// public void start() { +// thread = new Thread(this, "Writer"); +// thread.start(); +// } +// +// public void stop() throws InterruptedException { +// stopped.set(true); +// +// //Add an op to trigger shutdown: +// SharedQueueOp op = new SharedQueueOp() { +// public void run() { +// } +// }; +// op.op = new VoidCallback() { +// +// @Override +// public void run(HawtDBSession session) throws Exception { +// // TODO Auto-generated method stub +// } +// }; +// +// queue.put(op); +// thread.join(); +// } +// +// public void run() { +// HawtDBSession session = store.getSession(); +// try { +// LinkedList processed = new LinkedList(); +// while (!stopped.get()) { +// SharedQueueOp op = queue.take(); +// session.acquireLock(); +// int ops = 0; +// while (op != null && ops < 1000) { +// op.op.execute(session); +// processed.add(op); +// op = queue.poll(); +// ops++; +// } +// +// session.commit(); +// session.releaseLock(); +// +// if (SYNC_TO_DISK) { +// store.flush(); +// } +// +// for (Runnable r : processed) { +// r.run(); +// } +// processed.clear(); +// } +// +// } catch (InterruptedException e) { +// if (!stopped.get()) { +// e.printStackTrace(); +// } +// return; +// } catch (Exception e) { +// e.printStackTrace(); +// return; +// } +// } +// +// public void addOp(SharedQueueOp op) throws InterruptedException { +// queue.put(op); +// } +// } +// +// abstract class SharedQueueOp implements Runnable { +// VoidCallback op; +// } +// +// class Producer implements Runnable { +// private Thread thread; +// private AtomicBoolean stopped = new AtomicBoolean(); +// private String name; +// protected final MetricCounter rate = new MetricCounter(); +// private long sleep; +// +// public Producer(String name) { +// this.name = name; +// } +// +// public void start() { +// rate.name("Producer " + name + " Rate"); +// totalProducerRate.add(rate); +// thread = new Thread(this, "Producer" + name); +// thread.start(); +// } +// +// public void stop() throws InterruptedException { +// stopped.set(true); +// while (enqueuePermits.hasQueuedThreads()) { +// enqueuePermits.release(); +// } +// thread.join(); +// } +// +// public void run() { +// try { +// Buffer buffer = new Buffer(new byte[1024]); +// for (long i = 0; !stopped.get(); i++) { +// +// enqueuePermits.acquire(); +// +// final MessageRecord messageRecord = new MessageRecord(); +// messageRecord.key = store.allocateStoreTracking(); +// messageRecord.protocol = new AsciiBuffer("encoding"); +// messageRecord.value = buffer; +// messageRecord.size = buffer.getLength(); +// +// SharedQueueOp op = new SharedQueueOp() { +// public void run() { +// rate.increment(); +// } +// }; +// +// op.op = new VoidCallback() { +// @Override +// public void run(HawtDBSession session) throws Exception { +// session.messageAdd(messageRecord); +// QueueEntryRecord queueEntryRecord = new QueueEntryRecord(); +// queueEntryRecord.messageKey = messageRecord.key; +// queueEntryRecord.queueKey = queueKey.incrementAndGet(); +// queueEntryRecord.size = messageRecord.size; +// session.queueAddMessage(queueId, queueEntryRecord); +// dequeuePermits.release(); +// } +// }; +// +// if (!USE_SHARED_WRITER) { +// store.execute(op.op, op); +// +// if (SYNC_TO_DISK) { +// store.flush(); +// } +// +// } else { +// writer.addOp(op); +// } +// +// if (sleep > 0) { +// Thread.sleep(sleep); +// } +// } +// } catch (InterruptedException e) { +// if (!stopped.get()) { +// e.printStackTrace(); +// } +// return; +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// +// class Consumer implements Runnable { +// private Thread thread; +// private AtomicBoolean stopped = new AtomicBoolean(); +// protected final MetricCounter rate = new MetricCounter(); +// private String name; +// private final Semaphore queryWait = new Semaphore(0); +// +// public Consumer(String name) { +// this.name = name; +// } +// +// public void start() { +// rate.name("Consumer " + name + " Rate"); +// totalConsumerRate.add(rate); +// thread = new Thread(this, "Consumer " + name); +// thread.start(); +// } +// +// public void stop() throws InterruptedException { +// stopped.set(true); +// queryWait.release(); +// thread.join(); +// } +// +// public void run() { +// try { +// while (!stopped.get()) { +// final ArrayList records = new ArrayList(1000); +// SharedQueueOp op = new SharedQueueOp() { +// public void run() { +// rate.increment(records.size()); +// enqueuePermits.release(records.size()); +// queryWait.release(); +// } +// }; +// +// op.op = new VoidCallback() { +// @Override +// public void run(HawtDBSession session) throws Exception { +// Iterator queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000); +// for (Iterator iterator = queueRecords; iterator.hasNext();) { +// QueueEntryRecord r = iterator.next(); +// records.add(session.messageGetRecord(r.messageKey)); +// session.queueRemoveMessage(queueId, r.queueKey); +// } +// } +// }; +// +// if (!USE_SHARED_WRITER) { +// store.execute(op.op, op); +// if (SYNC_TO_DISK) { +// store.flush(); +// } +// } else { +// writer.addOp(op); +// } +// +// dequeuePermits.acquire(); +// records.clear(); +// } +// } catch (InterruptedException e) { +// if (!stopped.get()) { +// e.printStackTrace(); +// } +// return; +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// +// public void test1_1_0() throws Exception { +// startProducers(1); +// reportRates(); +// } +// +// +// public void test1_1_1() throws Exception { +// startProducers(1); +// startConsumers(1); +// reportRates(); +// } +// +// public void test10_1_1() throws Exception { +// startProducers(10); +// startConsumers(1); +// reportRates(); +// } +// +// private void startProducers(int count) { +// for (int i = 0; i < count; i++) { +// Producer p = new Producer("" + (i + 1)); +// producers.add(p); +// p.start(); +// } +// } +// +// private void startConsumers(int count) { +// for (int i = 0; i < count; i++) { +// Consumer c = new Consumer("" + (i + 1)); +// consumers.add(c); +// c.start(); +// } +// } +// +// private void reportRates() throws InterruptedException { +// System.out.println("Checking rates for test: " + getName()); +// for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { +// Period p = new Period(); +// Thread.sleep(1000 * 5); +// System.out.println(totalProducerRate.getRateSummary(p)); +// System.out.println(totalConsumerRate.getRateSummary(p)); +// totalProducerRate.reset(); +// totalConsumerRate.reset(); +// } +// } + +} \ No newline at end of file