activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1162574 [1/2] - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-broker/src/test/scala/org/apache/activemq/a...
Date Sun, 28 Aug 2011 19:13:05 GMT
Author: chirino
Date: Sun Aug 28 19:13:04 2011
New Revision: 1162574

URL: http://svn.apache.org/viewvc?rev=1162574&view=rev
Log:
Upgrade to hawtdb-1.6 and simplify the hawtdb store implementation .

Added:
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/pom.xml
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sun Aug 28 19:13:04 2011
@@ -608,12 +608,7 @@ class LocalRouter(val virtual_host:Virtu
 
                 val binding = BindingFactory.create(destination)
                 if( queue.tune_persistent && queue.store_id == -1 ) {
-
-                  val record = new QueueRecord
-                  record.key = queue.store_id
-                  record.binding_data = binding.binding_data
-                  record.binding_kind = binding.binding_kind
-
+                  val record = QueueRecord(queue.store_id, binding.binding_kind, binding.binding_data)
                   // Update the bindings
                   virtual_host.store.add_queue(record) { rc => Unit }
                 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Aug 28 19:13:04 2011
@@ -144,10 +144,7 @@ class Queue(val router: LocalRouter, val
     tune_quota = Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
 
     if( tune_persistent ) {
-      val record = new QueueRecord
-      record.key = store_id
-      record.binding_data = binding.binding_data
-      record.binding_kind = binding.binding_kind
+      val record = QueueRecord(store_id, binding.binding_kind, binding.binding_data)
       virtual_host.store.add_queue(record) { rc => Unit }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Sun Aug 28 19:13:04 2011
@@ -1,6 +1,7 @@
 package org.apache.activemq.apollo.broker.store
 
 import java.io.{OutputStream, InputStream}
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -50,9 +51,11 @@ object PBSupport {
   def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).writeUnframed(out)
   def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseUnframed(in)
 
-  implicit def encode_message_record(v: MessageRecord) = to_pb(v).toUnframedByteArray
+  implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).toUnframedByteArray
   implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
 
+  implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).toUnframedBuffer
+  implicit def decode_message_record_buffer(data: Buffer):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
 
 
   implicit def to_pb(v: QueueRecord):QueuePB.Buffer = {
@@ -64,11 +67,7 @@ object PBSupport {
   }
 
   implicit def from_pb(pb: QueuePB.Getter):QueueRecord = {
-    val rc = new QueueRecord
-    rc.key = pb.getKey
-    rc.binding_data = pb.getBindingData
-    rc.binding_kind = pb.getBindingKind
-    rc
+    QueueRecord(pb.getKey, pb.getBindingKind, pb.getBindingData)
   }
 
   def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).writeUnframed(out)
@@ -77,6 +76,8 @@ object PBSupport {
   implicit def encode_queue_record(v: QueueRecord) = to_pb(v).toUnframedByteArray
   implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
 
+  implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).toUnframedBuffer
+  implicit def decode_queue_record_buffer(data: Buffer):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
 
   implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Buffer = {
     val pb = new QueueEntryPB.Bean
@@ -113,4 +114,7 @@ object PBSupport {
   implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).toUnframedByteArray
   implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
 
+  implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).toUnframedBuffer
+  implicit def decode_queue_entry_record_buffer(data: Buffer):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala Sun Aug 28 19:13:04 2011
@@ -17,14 +17,9 @@
 package org.apache.activemq.apollo.broker.store
 
 import org.fusesource.hawtbuf.AsciiBuffer;
-import org.fusesource.hawtbuf.Buffer;
-
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class QueueRecord {
-  var key = -1L
-  var binding_kind: AsciiBuffer = _
-  var binding_data: Buffer = _
-}
+case class QueueRecord(key:Long, binding_kind: AsciiBuffer, binding_data:Buffer)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala Sun Aug 28 19:13:04 2011
@@ -86,10 +86,7 @@ abstract class StoreBenchmarkSupport ext
   val queue_key_counter = new LongCounter
 
   def add_queue(name:String):Long = {
-    var queueA = new QueueRecord
-    queueA.key = queue_key_counter.incrementAndGet
-    queueA.binding_kind = ascii("test")
-    queueA.binding_data = ascii(name)
+    var queueA = QueueRecord(queue_key_counter.incrementAndGet, ascii("test"), ascii(name))
     val rc:Boolean = CB( cb=> store.add_queue(queueA)(cb) )
     expect(true)(rc)
     queueA.key

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Sun Aug 28 19:13:04 2011
@@ -84,10 +84,7 @@ abstract class StoreFunSuiteSupport exte
   val queue_key_counter = new LongCounter
 
   def add_queue(name:String):Long = {
-    var queue_a = new QueueRecord
-    queue_a.key = queue_key_counter.incrementAndGet
-    queue_a.binding_kind = ascii("test")
-    queue_a.binding_data = ascii(name)
+    var queue_a = QueueRecord(queue_key_counter.incrementAndGet, ascii("test"), ascii(name))
     val rc:Boolean = CB( cb=> store.add_queue(queue_a)(cb) )
     expect(true)(rc)
     queue_a.key

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/pom.xml?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/pom.xml Sun Aug 28 19:13:04 2011
@@ -53,7 +53,7 @@
     
     <dependency>
       <groupId>org.fusesource.hawtbuf</groupId>
-      <artifactId>hawtbuf-proto</artifactId>
+      <artifactId>hawtbuf</artifactId>
       <version>${hawtbuf-version}</version>
     </dependency>
 
@@ -127,22 +127,6 @@
     <plugins>
 
       <plugin>
-        <groupId>org.fusesource.hawtbuf</groupId>
-        <artifactId>hawtbuf-protoc</artifactId>
-        <version>${hawtbuf-version}</version>
-        <configuration>
-          <type>alt</type>
-        </configuration>
-         <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
         <groupId>org.fusesource.scalate</groupId>
         <artifactId>maven-scalate-plugin</artifactId>
         <version>${scalate-version}</version>

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Sun Aug 28 19:13:04 2011
@@ -16,72 +16,134 @@
  */
 package org.apache.activemq.apollo.broker.store.hawtdb
 
-import dto.HawtDBStoreDTO
 import java.{lang=>jl}
 import java.{util=>ju}
 
-import java.io.File
-import java.io.IOException
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
-import org.fusesource.hawtdb.internal.journal.{JournalListener, Journal, Location}
-import org.apache.activemq.apollo.broker.store.hawtdb.model.Type._
-import org.apache.activemq.apollo.broker.store.hawtdb.model._
-import org.fusesource.hawtbuf._
-import org.fusesource.hawtdispatch._
-import collection.mutable.{LinkedHashMap, ListBuffer}
-import collection.JavaConversions
-import ju.{TreeSet, HashSet}
+import org.fusesource.hawtbuf.proto.PBMessageFactory
+import org.apache.activemq.apollo.broker.store.PBSupport._
 
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtdb.api._
 import org.apache.activemq.apollo.broker.store._
+import java.io._
+import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.util._
-import OptionSupport._
-
+import collection.mutable.ListBuffer
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
+import collection.immutable.TreeMap
+import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
+import java.util.concurrent.atomic.AtomicReference
+import scala.Predef._
+import org.fusesource.hawtdb.api._
+import org.fusesource.hawtbuf.Buffer._
+import org.fusesource.hawtdb.internal.page.LFUPageCache
+import org.fusesource.hawtdispatch._
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object HawtDBClient extends Log {
-  val BEGIN = -1
-  val COMMIT = -2
-  val ROLLBACK = -3
 
-  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000
+  final val message_prefix = 'm'.toByte
+  final val queue_prefix = 'q'.toByte
+  final val queue_entry_prefix = 'e'.toByte
+  final val map_prefix = 'p'.toByte
+
+  final val message_prefix_array = new Buffer(Array(message_prefix))
+  final val queue_prefix_array = new Buffer(Array(queue_prefix))
+  final val map_prefix_array = new Buffer(Array(map_prefix))
+  final val queue_entry_prefix_array = new Buffer(Array(queue_entry_prefix))
+  final val dirty_index_key = ascii(":dirty")
+  final val FALSE = ascii("false")
+  final val TRUE = ascii("true")
+
+  final val LOG_ADD_QUEUE           = 1.toByte
+  final val LOG_REMOVE_QUEUE        = 2.toByte
+  final val LOG_ADD_MESSAGE         = 3.toByte
+  final val LOG_REMOVE_MESSAGE      = 4.toByte
+  final val LOG_ADD_QUEUE_ENTRY     = 5.toByte
+  final val LOG_REMOVE_QUEUE_ENTRY  = 6.toByte
+  final val LOG_MAP_ENTRY           = 7.toByte
+
+  final val LOG_SUFFIX  = ".log"
+  final val INDEX_SUFFIX  = ".index"
+
+  import FileSupport._
+  def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
+
+  def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
+    TreeMap((directory.list_files.flatMap { f=>
+      if( f.getName.endsWith(suffix) ) {
+        try {
+          val base = f.getName.stripSuffix(suffix)
+          val position = java.lang.Long.parseLong(base, 16);
+          Some(position -> f)
+        } catch {
+          case e:NumberFormatException => None
+        }
+      } else {
+        None
+      }
+    }): _* )
+  }
 
-  val CLOSED_STATE = 1
-  val OPEN_STATE = 2
-}
+  case class UsageCounter() {
+    var count = 0L
+    var size = 0L
+    def increment(value:Int) = {
+      count += 1
+      size += value
+    }
+  }
 
-/**
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class HawtDBClient(hawtDBStore: HawtDBStore) {
-  import HawtDBClient._
-  import Helpers._
+  trait Link {
+    def apply(source:File, target:File, tmp:File);
+  }
 
-  def dispatchQueue = hawtDBStore.dispatch_queue
 
+  object ExecLnLink extends Link {
+    def apply(source: File, target: File, tmp: File) = {
+      val p = sys.process.Process(Array("ln", source.getCanonicalPath, target.getCanonicalPath))
+      if( (p.!) !=0 ) {
+        // Fallback to a copying..
+        CopyLink(source, target, tmp)
+      }
+    }
+  }
 
-  private val indexFileFactory = new TxPageFileFactory()
-  private var journal: Journal = null
+  object CopyLink extends Link {
+    def apply(source: File, target: File, tmp: File) = {
+      try {
+        source.copy_to(tmp)
+        tmp.renameTo(target)
+      } finally {
+        tmp.delete()
+      }
+    }
+  }
 
-  private var lockFile: LockFile = null
-  private val trackingGen = new AtomicLong(0)
-  private val lockedDatatFiles = new HashSet[jl.Integer]()
+  val ON_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
+  var link:Link = if(ON_WINDOWS) {
+    // We know we can use ln on windows..
+    CopyLink
+  } else {
+    // We can probably use the ln command.
+    ExecLnLink
+  }
 
-  private var recovering = false
-  private var nextRecoveryPosition: Location = null
-  private var lastRecoveryPosition: Location = null
-  private var recoveryCounter = 0
 
-  @volatile
-  var rootBuffer = (new DatabaseRootRecord.Bean()).freeze
+}
 
-  @volatile
-  var storedRootBuffer = (new DatabaseRootRecord.Bean()).freeze
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HawtDBClient(store: HawtDBStore) {
 
+  import Helper._
+  import HawtDBClient._
+  import FileSupport._
 
-  val next_batch_counter = new AtomicInteger(0)
-  private var batches = new LinkedHashMap[Int, (Location, ListBuffer[Update])]()
+  def dispatchQueue = store.dispatch_queue
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -89,18 +151,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   //
   /////////////////////////////////////////////////////////////////////
 
-
+  def config = store.config
   def directory = config.directory
-  def journal_log_size = config.journal_log_size.getOrElse(1024*1024*64)
-  def journal_batch_size = config.journal_batch_size.getOrElse(1024*256)
-  def index_flush_interval = config.index_flush_interval.getOrElse(5L * 1000L)
-  def cleanup_interval = config.cleanup_interval.getOrElse(30 * 1000L)
-  def fail_if_locked = config.fail_if_locked.getOrElse(false)
-  def index_page_size = config.index_page_size.getOrElse(512.toShort)
-  def index_cache_size = config.index_cache_size.getOrElse(5000)
-
-  private def index_file = indexFileFactory.getTxPageFile()
-
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -108,1004 +160,751 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   //
   /////////////////////////////////////////////////////////////////////
 
-  var config: HawtDBStoreDTO = null
-
-  def lock(func: => Unit) {
-    val lockFileName = new File(directory, "lock")
-    lockFile = new LockFile(lockFileName, true)
-    if (fail_if_locked) {
-      lockFile.lock()
-      func
-    } else {
-      val locked = try {
-        lockFile.lock()
-        true
-      } catch {
-        case e: IOException =>
-          false
-      }
-      if (locked) {
-        func
-      } else {
-        info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.")
-        dispatchQueue.executeAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {
-          hawtDBStore.executor_pool {
-            lock(func _)
-          }
-        })
-      }
-    }
-  }
-
+  var verify_checksums = false;
 
-  def start(onComplete:Runnable) = {
-    lock {
+  var log:RecordLog = _
 
-      journal = new Journal()
-      journal.setDirectory(directory)
-      journal.setMaxFileLength(journal_log_size)
-      journal.setMaxWriteBatchSize(journal_batch_size);
-      journal.setChecksum(true);
-      journal.setListener( new JournalListener{
-        def synced(writes: Array[JournalListener.Write]) = {
-          var onCompletes = List[Runnable]()
-          withTx { tx=>
-            val helper = new TxHelper(tx)
-            writes.foreach { write=>
-              val func = write.getAttachment.asInstanceOf[(TxHelper, Location)=>List[Runnable]]
-              onCompletes = onCompletes ::: func(helper, write.getLocation)
-            }
-            helper.storeRootBean
-          }
-          onCompletes.foreach( _.run )
-        }
-      })
+  var last_index_snapshot_pos:Long = _
+  val snapshot_rw_lock = new ReentrantReadWriteLock(true)
 
-      if( config.archive_directory!=null ) {
-        journal.setDirectoryArchive(config.archive_directory)
-        journal.setArchiveDataLogs(true)
-      }
-      journal.start
-
-      indexFileFactory.setFile(new File(directory, "db"))
-      indexFileFactory.setDrainOnClose(false)
-      indexFileFactory.setSync(true)
-      indexFileFactory.setUseWorkerThread(true)
-      indexFileFactory.setPageSize(index_page_size)
-      indexFileFactory.setCacheSize(index_cache_size);
-
-      indexFileFactory.open
-
-      val initialized = withTx { tx =>
-          if (!tx.allocator().isAllocated(0)) {
-            val helper = new TxHelper(tx)
-            import helper._
-
-            val rootPage = tx.alloc()
-            assert(rootPage == 0)
-
-            rootBean.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
-            rootBean.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
-            rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
-            rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
-            rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
-            storedRootBuffer = rootBean.freeze
-            helper.storeRootBean
+  var last_gc_ts = 0L
+  var last_gc_duration = 0L
+  var in_gc = false
+  var gc_detected_log_usage = Map[Long, UsageCounter]()
 
-            true
-          } else {
-            rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
-            storedRootBuffer = rootBuffer;
-            false
-          }
-      }
+  def dirty_index_file = directory / ("dirty"+INDEX_SUFFIX)
+  def temp_index_file = directory / ("temp"+INDEX_SUFFIX)
+  def snapshot_index_file(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
 
-      if( initialized ) {
-        index_file.flush()
-      }
+  private val index_file_factory = new TxPageFileFactory()
+  var lock_file:LockFile = _
 
-      recover(onComplete)
-    }
+  def create_log: RecordLog = {
+    new RecordLog(directory, LOG_SUFFIX)
   }
 
-  def stop() = {
-    journal.close
-    indexFileFactory.close
-    lockFile.unlock
+  def log_size = {
+    import OptionSupport._
+    config.log_size.getOrElse(1024 * 1024 * 100)
   }
 
-  def addQueue(record: QueueRecord, callback:Runnable) = {
-    val update = new AddQueue.Bean()
-    update.setKey(record.key)
-    update.setBindingKind(record.binding_kind)
-    update.setBindingData(record.binding_data)
-    _store(update, callback)
-  }
+  def retry_using_index[T](func: (RichBTreeIndex)=>T):T = retry(using_index(func))
 
-  def removeQueue(queueKey: Long, callback:Runnable) = {
-    val update = new RemoveQueue.Bean()
-    update.setKey(queueKey)
-    _store(update, callback)
-  }
-
-  def store(uows: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
-    var batch = ListBuffer[TypeCreatable]()
-    uows.foreach { uow =>
-
-        for((key,value) <- uow.map_actions) {
-          val entry = new MapEntry.Bean
-          entry.setKey(key)
-          entry.setValue(value)
-          batch += entry
-        }
-
-        uow.actions.foreach {
-          case (msg, action) =>
-            if (action.message_record != null) {
-              val update: AddMessage.Bean = action.message_record
-              batch += update
-            }
-            action.enqueues.foreach {
-              queueEntry =>
-                val update: AddQueueEntry.Bean = queueEntry
-                batch += update
-            }
-            action.dequeues.foreach {
-              queueEntry =>
-                val queueKey = queueEntry.queue_key
-                val queueSeq = queueEntry.entry_seq
-                batch += new RemoveQueueEntry.Bean().setQueueKey(queueKey).setQueueSeq(queueSeq)
-            }
+  def using_index[T](func: (RichBTreeIndex)=>T):T = {
+    val lock = snapshot_rw_lock.readLock();
+    lock.lock()
+    try {
+      val tx = index_file_factory.getTxPageFile.tx
+      var ok = false
+      try {
+        val rc = func(new RichBTreeIndex(INDEX_FACTORY.openOrCreate(tx)))
+        ok = true
+        rc
+      } finally {
+        if (ok) {
+          tx.commit
+        } else {
+          tx.rollback
         }
-    }
-    _store(batch, callback)
-  }
-
-
-  def purge(callback: Runnable) = {
-    _store(new Purge.Bean(), callback)
-  }
-
-  def listQueues: Seq[Long] = {
-    val rc = ListBuffer[Long]()
-    withTx { tx =>
-      val helper = new TxHelper(tx)
-      import JavaConversions._
-      import helper._
-
-      queueIndex.iterator.foreach { entry =>
-        rc += entry.getKey.longValue
+        tx.close
       }
+    } finally {
+      lock.unlock()
     }
-    rc
   }
 
-  def get(key: Buffer):Option[Buffer] = {
-    withTx { tx =>
-        val helper = new TxHelper(tx)
-        import helper._
-        Option(mapIndex.get(key))
-    }
-  }
+  def retry[T](func: => T): T = {
+    var error:Throwable = null
+    var rc:Option[T] = None
 
-  def getQueue(queueKey: Long): Option[QueueRecord] = {
-    withTx { tx =>
-        val helper = new TxHelper(tx)
-        import helper._
-
-        val queueRecord = queueIndex.get(queueKey)
-        if (queueRecord != null) {
-          val record = new QueueRecord
-          record.key = queueKey
-          record.binding_kind = queueRecord.getInfo.getBindingKind
-          record.binding_data = queueRecord.getInfo.getBindingData
-          Some(record)
-        } else {
-          None
-        }
-    }
-  }
+    // We will loop until the tx succeeds.  Perhaps it's
+    // failing due to a temporary condition like low disk space.
+    while(!rc.isDefined) {
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryRange] = {
-    withTx { tx =>
-        val helper = new TxHelper(tx)
-        import JavaConversions._
-        import helper._
-        val queueRecord = queueIndex.get(queueKey)
-        if (queueRecord != null) {
-          val entryIndex = queueEntryIndex(queueRecord)
-
-          var rc = ListBuffer[QueueEntryRange]()
-          var group:QueueEntryRange = null
-
-          entryIndex.iterator.foreach { entry =>
-            if( group == null ) {
-              group = new QueueEntryRange
-              group.first_entry_seq = entry.getKey.longValue
-            }
-            group.last_entry_seq = entry.getKey.longValue
-            group.count += 1
-            group.size += entry.getValue.getSize
-
-// TODO:
-//            if(group.expiration == 0){
-//              group.expiration = entry.expiration
-//            } else {
-//              if( entry.expiration != 0 ) {
-//                group.expiration = entry.expiration.min(group.expiration)
-//              }
-//            }
-
-            if( group.count == limit) {
-              rc += group
-              group = null
-            }
+      try {
+        rc = Some(func)
+      } catch {
+        case e:Throwable =>
+          if( error==null ) {
+            warn(e, "DB operation failed. (entering recovery mode)")
           }
+          error = e
+      }
 
-          if( group!=null ) {
-            rc += group
-          }
-          rc
-        } else {
-          null
+      if (!rc.isDefined) {
+        // We may need to give up if the store is being stopped.
+        if ( !store.service_state.is_started ) {
+          throw error
         }
+        Thread.sleep(1000)
+      }
     }
-  }
 
-  def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
-    var rc = ListBuffer[QueueEntryRecord]()
-    withTx { tx =>
-      val helper = new TxHelper(tx)
-      import JavaConversions._
-      import helper._
-      import Predicates._
-
-      val queueRecord = queueIndex.get(queueKey)
-      if (queueRecord != null) {
-        val entryIndex = queueEntryIndex(queueRecord)
-
-        val where = and(gte(new jl.Long(firstSeq)), lte(new jl.Long(lastSeq)))
-        entryIndex.iterator( where ).foreach {
-          entry =>
-            val record: QueueEntryRecord = entry.getValue
-            rc += record
-        }
-      } else {
-        rc = null
-      }
+    if( error!=null ) {
+      info("DB recovered from failure.")
     }
-    rc
+    rc.get
   }
 
-  val metric_load_from_index_counter = new TimeCounter
-  var metric_load_from_index = metric_load_from_index_counter(false)
+  def start() = {
+    import OptionSupport._
 
-  val metric_load_from_journal_counter = new TimeCounter
-  var metric_load_from_journal = metric_load_from_journal_counter(false)
-
-  def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
-    val locations = withTx { tx =>
-      val helper = new TxHelper(tx)
-      import helper._
-      requests.flatMap { case (messageKey, callback)=>
-        val location = metric_load_from_index_counter.time {
-          messageKeyIndex.get(messageKey)
-        }
-        if( location==null ) {
-          debug("Message not indexed.  Journal location could not be determined for message: %s", messageKey)
-          callback(None)
-          None
-        } else {
-          Some((location, callback))
-        }
-      }
-    }
-
-    locations.foreach { case (location, callback)=>
-      val addMessage = metric_load_from_journal_counter.time {
-        load(location, classOf[AddMessage.Getter])
+    lock_file = new LockFile(directory / "lock", true)
+    if (config.fail_if_locked.getOrElse(false)) {
+      lock_file.lock()
+    } else {
+      retry {
+        lock_file.lock()
       }
-      callback( addMessage.map( x => toMessageRecord(x) ) )
     }
 
-  }
-
-  def loadMessage(messageKey: Long): Option[MessageRecord] = {
-    metric_load_from_index_counter.start { end =>
-      withTx { tx =>
-        val helper = new TxHelper(tx)
-        import helper._
-
-        val location = messageKeyIndex.get(messageKey)
-        end()
 
-        if (location != null) {
-          metric_load_from_journal_counter.time {
-            load(location, classOf[AddMessage.Getter]) match {
-              case Some(x) =>
-                val messageRecord: MessageRecord = x
-                Some(messageRecord)
-              case None => None
-            }
+    verify_checksums = config.verify_checksums.getOrElse(false);
+    index_file_factory.setFile(dirty_index_file)
+    index_file_factory.setDrainOnClose(true)
+    index_file_factory.setSync(false)
+    index_file_factory.setUseWorkerThread(false)
+
+    index_file_factory.setPageSize(config.index_page_size.getOrElse(1024*4).toShort)
+    val cache_size = ((config.index_cache_size.getOrElse(1024*1024*256L)) / index_file_factory.getPageSize()).toInt
+    index_file_factory.setPageCache(new LFUPageCache(cache_size, 0.90.toFloat));
+
+    log = create_log
+    log.write_buffer_size = config.log_write_buffer_size.getOrElse(1024*1024*4)
+    log.log_size = log_size
+    log.on_log_rotate = ()=> {
+      // lets queue a request to checkpoint when
+      // the logs rotate.. queue it on the GC thread since GC's lock
+      // the index for a long time.
+      store.gc_executor {
+        snapshot_index
+      }
+    }
+
+    retry {
+      log.open
+    }
+
+    // Find out what was the last snapshot.
+    val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
+    var last_snapshot_index = snapshots.lastOption
+    last_index_snapshot_pos = last_snapshot_index.map(_._1).getOrElse(0)
+
+    // Only keep the last snapshot..
+    snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach( _._2.recursive_delete )
+    temp_index_file.delete
+
+    retry {
+
+      // Delete the dirty indexes
+      dirty_index_file.delete()
+
+      // Resume log replay from a snapshot of the index..
+      for( last <- last_snapshot_index ) {
+        try {
+          link(last._2, dirty_index_file, temp_index_file)
+        } catch {
+          case e:Exception =>
+            warn(e, "Could not recover snapshot of the index: "+e)
+            last_snapshot_index = None
+        }
+      }
+
+      index_file_factory.open
+      using_index { index=>
+        index.put(dirty_index_key, TRUE)
+        // Update the index /w what was stored on the logs..
+        var pos = last_index_snapshot_pos;
+
+        // Replay the log from the last update position..
+        while (pos < log.appender_limit) {
+          log.read(pos).map {
+            case (kind, data, len) =>
+              kind match {
+                case LOG_ADD_MESSAGE =>
+                  val record: MessageRecord = data
+                  index.put(encode(message_prefix, record.key), encode_long(pos))
+                case LOG_ADD_QUEUE_ENTRY =>
+                  val record: QueueEntryRecord = data
+                  index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), new Buffer(data))
+                case LOG_REMOVE_QUEUE_ENTRY =>
+                  index.delete(new Buffer(data))
+                case LOG_ADD_QUEUE =>
+                  val record: QueueRecord = data
+                  index.put(encode(queue_prefix, record.key), new Buffer(data))
+                case LOG_REMOVE_QUEUE =>
+                  val queue_key = decode_long(new Buffer(data))
+                  index.delete(encode(queue_prefix, queue_key))
+                  index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key)) { key =>
+                    index.delete(key)
+                    true
+                  }
+                case LOG_MAP_ENTRY =>
+                  val entry = MapEntryPB.FACTORY.parseUnframed(data)
+                  if (entry.getValue == null) {
+                    index.delete(encode(map_prefix, entry.getKey))
+                  } else {
+                    index.put(encode(map_prefix, entry.getKey), entry.getValue)
+                  }
+                case _ =>
+                // Skip unknown records like the RecordLog headers.
+              }
+              pos += len
           }
-        } else {
-          debug("Message not indexed.  Journal location could not be determined for message: %s", messageKey)
-          None
         }
       }
     }
   }
 
+  def stop() = {
+    // this blocks until all io completes..
+    // Suspend also deletes the index.
+    suspend()
 
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Batch/Transactional interface to storing/accessing journaled updates.
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = {
-    try {
-      load(location) match {
-          case (updateType, batch, data) =>
-            val decoded = expected.cast(decode(location, updateType, data))
-            val rc = Some(decoded)
-            rc
-      }
-    } catch {
-      case e: Throwable =>
-        debug(e, "Could not load journal record at: %s", location)
-        None
+    if (log != null) {
+      log.close
     }
-  }
+    copy_dirty_index_to_snapshot
+    log = null
 
-  private def _store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
-    val batch = next_batch_id
-    begin(batch)
-    updates.foreach {
-      update =>
-        _store(batch, update, null)
-    }
-    commit(batch, onComplete)
+    lock_file.unlock()
+    lock_file=null
   }
 
-  private def _store(update: TypeCreatable, onComplete: Runnable): Unit = _store(-1, update, onComplete)
-
-  val metric_journal_append_counter = new TimeCounter
-  var metric_journal_append = metric_journal_append_counter(false)
-
-  val metric_index_update_counter = new TimeCounter
-  var metric_index_update = metric_index_update_counter(false)
-
   /**
-   * All updated are are funneled through this method. The updates are logged to
-   * the journal and then the indexes are update.  onFlush will be called back once
-   * this all completes and the index has the update.
-   *
-   * @throws IOException
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
    */
-  private def _store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
-    val kind = update.asInstanceOf[TypeCreatable]
-    val frozen = update.freeze
-    val baos = new DataByteArrayOutputStream(frozen.serializedSizeFramed + 5)
-    baos.writeByte(kind.toType().getNumber())
-    baos.writeInt(batch)
-    frozen.writeFramed(baos)
-
-    val buffer = baos.toBuffer()
-    append(buffer) { (helper, location) =>
-      metric_index_update_counter.time {
-        executeStore(helper, location, batch, update, onComplete)
-      }
-    }
-  }
+  def suspend() = {
 
-  /**
-   */
-  private def begin(batch: Int): Unit = {
-    val baos = new DataByteArrayOutputStream(5)
-    baos.writeByte(BEGIN)
-    baos.writeInt(batch)
-    append(baos.toBuffer) { (helper,location) =>
-      executeBegin(helper, location, batch)
+    // Make sure we are the only ones accessing the index. since
+    // we will be closing it to create a consistent snapshot.
+    snapshot_rw_lock.writeLock().lock()
+
+    // Close the index so that it's files are not changed async on us.
+    using_index { index=>
+      index.put(dirty_index_key, FALSE)
     }
+    index_file_factory.close()
   }
 
   /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
    */
-  private def commit(batch: Int, onComplete: Runnable): Unit = {
-    val baos = new DataByteArrayOutputStream(5)
-    baos.writeByte(COMMIT)
-    baos.writeInt(batch)
-    append(baos.toBuffer) { (helper,location) =>
-      executeCommit(helper, location, batch, onComplete)
+  def resume() = {
+    // re=open it..
+    retry {
+      index_file_factory.open()
+      using_index { index=>
+        index.put(dirty_index_key, TRUE)
+      }
     }
+    snapshot_rw_lock.writeLock().unlock()
   }
 
-  private def rollback(batch: Int, onComplete: Runnable): Unit = {
-    val baos = new DataByteArrayOutputStream(5)
-    baos.writeByte(ROLLBACK)
-    baos.writeInt(batch)
-    append(baos.toBuffer) { (helper,location) =>
-      executeRollback(helper, location, batch, onComplete)
+  def copy_dirty_index_to_snapshot {
+    if( log.appender_limit == last_index_snapshot_pos  ) {
+      // no need to snapshot again...
+      return
     }
-  }
 
-  def load(location: Location) = {
-    var data = read(location)
-    val editor = data.bigEndianEditor
-    val updateType = editor.readByte()
-    val batch = editor.readInt
-    (updateType, batch, data)
-  }
+    // Where we start copying files into.  Delete this on
+    // restart.
+    try {
 
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Methods related to recovery
-  //
-  /////////////////////////////////////////////////////////////////////
+      val new_snapshot_index_pos = log.appender_limit
+      link(dirty_index_file, snapshot_index_file(new_snapshot_index_pos), temp_index_file)
+      snapshot_index_file(last_index_snapshot_pos).delete()
+      last_index_snapshot_pos = new_snapshot_index_pos
 
-  /**
-   * Recovers the journal and rollsback any in progress batches that
-   * were in progress and never committed.
-   *
-   * @throws IOException
-   * @throws IOException
-   * @throws IllegalStateException
-   */
-  def recover(onComplete:Runnable): Unit = {
-    recoveryCounter = 0
-    lastRecoveryPosition = null
-    val start = System.currentTimeMillis()
-    incrementalRecover
-
-
-    _store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
-      // Rollback any batches that did not complete.
-      batches.keysIterator.foreach {
-        batch =>
-          rollback(batch, null)
-      }
-
-      val end = System.currentTimeMillis()
-      info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end - start) / 1000.0f))
-      onComplete.run
-    })
+    } catch {
+      case e: Exception => warn(e, "Could not snapshot the index: " + e)
+    }
   }
 
-
-  /**
-   * incrementally recovers the journal.  It can be run again and again
-   * if the journal is being appended to.
-   */
-  def incrementalRecover(): Unit = {
-
-    // Is this our first incremental recovery pass?
-    if (lastRecoveryPosition == null) {
-      if (rootBuffer.hasFirstBatchLocation) {
-        // we have to start at the first in progress batch usually...
-        nextRecoveryPosition = rootBuffer.getFirstBatchLocation
-      } else {
-        // but perhaps there were no batches in progress..
-        if (rootBuffer.hasLastUpdateLocation) {
-          // then we can just continue from the last update applied to the index
-          lastRecoveryPosition = rootBuffer.getLastUpdateLocation
-          nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
-        } else {
-          // no updates in the index?.. start from the first record in the journal.
-          nextRecoveryPosition = journal.getNextLocation(null)
-        }
-      }
-    } else {
-      nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+  def snapshot_index:Unit = {
+    if( log.appender_limit == last_index_snapshot_pos  ) {
+      // no need to snapshot again...
+      return
     }
-
+    suspend()
     try {
-      recovering = true
-
-      // Continue recovering until journal runs out of records.
-      while (nextRecoveryPosition != null) {
-        lastRecoveryPosition = nextRecoveryPosition
-        recover(lastRecoveryPosition)
-        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
-      }
-
+      copy_dirty_index_to_snapshot
     } finally {
-      recovering = false
+      resume()
     }
   }
 
-  /**
-   * Recovers the logged record at the specified location.
-   */
-  def recover(location: Location): Unit = {
-    var data = journal.read(location)
-
-    val editor = data.bigEndianEditor
-    val updateType = editor.readByte()
-    val batch = editor.readInt()
-
-    withTx { tx=>
-      val helper = new TxHelper(tx)
-      updateType match {
-        case BEGIN => executeBegin(helper, location, batch)
-        case COMMIT => executeCommit(helper, location, batch, null)
-        case _ =>
-          val update = decode(location, updateType, data)
-          executeStore(helper, location, batch, update, null)
+  def purge() = {
+    suspend()
+    try{
+      log.close
+      directory.list_files.foreach(_.recursive_delete)
+    } finally {
+      retry {
+        log.open
       }
-      helper.storeRootBean
+      resume()
     }
-
-    recoveryCounter += 1
   }
 
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Methods for Journal access
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  private def append(data: Buffer)(cb: (TxHelper, Location) => List[Runnable]): Unit = {
-    metric_journal_append_counter.start { end =>
-      def cbintercept(tx:TxHelper,location:Location) = {
-        end()
-        cb(tx, location)
+  def addQueue(record: QueueRecord, callback:Runnable) = {
+    retry_using_index { index =>
+      log.appender { appender =>
+        val buffer:Buffer = record
+        appender.append(LOG_ADD_QUEUE, buffer)
+        index.put(encode(queue_prefix, record.key), buffer)
       }
-      journal.write(data, cbintercept _ )
     }
+    callback.run
   }
 
-  def read(location: Location) = journal.read(location)
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Methods that execute updates stored in the journal by indexing them
-  // Used both in normal operation and durring recovery.
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  private def executeBegin(helper:TxHelper, location: Location, batch: Int):List[Runnable] = {
-    assert(batches.get(batch).isEmpty)
-    batches.put(batch, (location, ListBuffer()))
-    Nil
-  }
-
-  private def executeCommit(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable):List[Runnable] = {
-    // apply all the updates in the batch as a single unit of work.
-    batches.remove(batch) match {
-      case Some((_, updates)) =>
-        // When recovering.. we only want to redo updates that committed
-        // after the last update location.
-        if (!recovering || isAfterLastUpdateLocation(location)) {
-            // index the updates
-            updates.foreach {
-              update =>
-                index(helper, update.update, update.location)
-            }
-            helper.updateLocations(location)
+  def removeQueue(queue_key: Long, callback:Runnable) = {
+    retry_using_index { index =>
+      log.appender { appender =>
+        appender.append(LOG_REMOVE_QUEUE, encode_long(queue_key))
+        index.delete(encode(queue_prefix, queue_key))
+        index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key)) { key=>
+          index.delete(key)
+          true
         }
-      case None =>
-        // when recovering..  we are more lax due recovery starting
-        // in the middle of a stream of in progress batches
-        assert(recovering)
-    }
-    if(onComplete!=null) {
-      return List(onComplete)
-    } else {
-      Nil
-    }
-  }
-
-  private def executeRollback(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable): List[Runnable] = {
-    // apply all the updates in the batch as a single unit of work.
-    batches.remove(batch) match {
-      case Some((_, _)) =>
-        if (!recovering || isAfterLastUpdateLocation(location)) {
-          helper.updateLocations(location)
-        }
-      case None =>
-        // when recovering..  we are more lax due recovery starting
-        // in the middle of a stream of in progress batches
-        assert(recovering)
-    }
-    if(onComplete!=null) {
-      return List(onComplete)
-    } else {
-      Nil
-    }
-  }
-
-  private def executeStore(helper:TxHelper, location: Location, batch: Int, update: TypeCreatable, onComplete: Runnable): List[Runnable] = {
-    if (batch == -1) {
-      // update is not part of the batch..
-
-      // When recovering.. we only want to redo updates that happen
-      // after the last update location.
-      if (!recovering || isAfterLastUpdateLocation(location)) {
-          index(helper, update, location)
-          helper.updateLocations(location)
-      }
-
-      if ( onComplete != null) {
-        return List(onComplete)
-      }
-    } else {
-
-      // only the commit/rollback in batch can have an onCompelte handler
-      assert(onComplete == null)
-
-      // if the update was part of a batch don't apply till the batch is committed.
-      batches.get(batch) match {
-        case Some((_, updates)) =>
-          updates += Update(update, location)
-        case None =>
-          // when recovering..  we are more lax due recovery starting
-          // in the middle of a stream of in progress batches
-          assert(recovering)
       }
     }
-    return Nil
+    callback.run
   }
 
+  def store(uows: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
+    retry_using_index { index =>
+      log.appender { appender =>
 
-  private def index(helper:TxHelper, update: TypeCreatable, location: Location): Unit = {
-    import JavaConversions._
-    import helper._
+        var sync_needed = false
+        uows.foreach { uow =>
 
-    def removeMessage(key:Long) = {
-      val location = messageKeyIndex.remove(key)
-      if (location != null) {
-        val fileId:jl.Integer = location.getDataFileId()
-        addAndGet(dataFileRefIndex, fileId, -1)
-      } else {
-        if( !recovering ) {
-          error("Cannot remove message, it did not exist: %d", key)
-        }
-      }
-    }
+          for((key,value) <- uow.map_actions) {
+            val entry = new MapEntryPB.Bean()
+            entry.setKey(key)
+            if( value==null ) {
+              index.delete(encode(map_prefix, key))
+            } else {
+              entry.setValue(value)
+              index.put(encode(map_prefix, key), value)
+            }
+            appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedBuffer)
+          }
+
+          uow.actions.foreach { case (msg, action) =>
+            val message_record = action.message_record
+            var pos = 0L
+            var pos_buffer:Buffer = null
+
+            if (message_record != null) {
+              pos = appender.append(LOG_ADD_MESSAGE, message_record)
+              val pos_encoded = encode_long(pos)
+              pos_buffer = new Buffer(pos_encoded)
+              if( message_record.locator !=null ) {
+                message_record.locator.set(pos_encoded.toByteArray);
+              }
+              index.put(encode(message_prefix, action.message_record.key), pos_encoded)
+            }
 
-    def removeQueue(queueKey:Long) = {
-      val queueRecord = queueIndex.remove(queueKey)
-      if (queueRecord != null) {
-        val trackingIndex = queueTrackingIndex(queueRecord)
-        val entryIndex = queueEntryIndex(queueRecord)
+            action.dequeues.foreach { entry =>
+              if( pos_buffer==null && entry.message_locator!=null ) {
+                pos_buffer = entry.message_locator
+              }
+              val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+              appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
+              index.delete(key)
+            }
 
-        trackingIndex.iterator.foreach { entry=>
-          val messageKey = entry.getKey
-          if( addAndGet(messageRefsIndex, messageKey, -1) == 0 ) {
-            // message is no longer referenced.. we can remove it..
-            removeMessage(messageKey.longValue)
+            action.enqueues.foreach { entry =>
+              entry.message_locator = pos_buffer
+              val encoded:Buffer = entry
+              appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
+              index.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+            }
+          }
+          if( !uow.complete_listeners.isEmpty ) {
+            sync_needed = true
           }
         }
-
-        entryIndex.destroy
-        trackingIndex.destroy
+        if( sync_needed ) {
+          appender.flush
+          appender.sync
+        }
       }
-
     }
+    callback.run
+  }
 
-    update match {
-      case x: AddMessage.Getter =>
-
-        val messageKey = x.getMessageKey()
-        if (messageKey > rootBean.getLastMessageKey) {
-          rootBean.setLastMessageKey(messageKey)
-        }
-
-        val prevLocation = messageKeyIndex.put(messageKey, location)
-        if (prevLocation != null) {
-          // Message existed.. undo the index update we just did. Chances
-          // are it's a transaction replay.
-          messageKeyIndex.put(messageKey, prevLocation)
-          if (location == prevLocation) {
-            warn("Message replay detected for: %d", messageKey)
-          } else {
-            error("Message replay with different location for: %d", messageKey)
-          }
-        } else {
-          val fileId:jl.Integer = location.getDataFileId()
-          addAndGet(dataFileRefIndex, fileId, 1)
-        }
+  val metric_load_from_index_counter = new TimeCounter
+  var metric_load_from_index = metric_load_from_index_counter(false)
 
-      case x: AddQueueEntry.Getter =>
+  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)]):Unit = {
 
-        val queueKey = x.getQueueKey
-        val queueRecord = queueIndex.get(queueKey)
-        if (queueRecord != null) {
-          val trackingIndex = queueTrackingIndex(queueRecord)
-          val entryIndex = queueEntryIndex(queueRecord)
-
-          // a message can only appear once in a queue (for now).. perhaps we should
-          // relax this constraint.
-          val messageKey = x.getMessageKey
-          val queueSeq = x.getQueueSeq
-
-          val existing = trackingIndex.put(messageKey, queueSeq)
-          if (existing == null) {
-            val previous = entryIndex.put(queueSeq, x.freeze)
-            if (previous == null) {
-              addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
-            } else {
-              // TODO perhaps treat this like an update?
-              error("Duplicate queue entry seq %d", x.getQueueSeq)
+    val missing = retry_using_index { index =>
+      requests.flatMap { x =>
+        val (message_key, locator, callback) = x
+        val record = metric_load_from_index_counter.time {
+          var pos = 0L
+          var pos_array:Array[Byte] = null
+          if( locator!=null ) {
+            pos_array = locator.get()
+            if( pos_array!=null ) {
+              pos = decode_long(new Buffer(pos_array))
+            }
+          }
+          if( pos == 0L ) {
+            index.get(encode(message_prefix, message_key)) match {
+              case Some(value) =>
+                pos_array = value.toByteArray
+                pos = decode_long(value)
+              case None =>
+                pos = 0L
             }
-          } else {
-            error("Duplicate queue entry message %d was %d", x.getMessageKey, existing)
           }
-        } else {
-          error("Queue not found: %d", x.getQueueKey)
-        }
-
-      case x: RemoveQueueEntry.Getter =>
-        val queueKey = x.getQueueKey
-        val queueRecord = queueIndex.get(queueKey)
-        if (queueRecord != null) {
-          val trackingIndex = queueTrackingIndex(queueRecord)
-          val entryIndex = queueEntryIndex(queueRecord)
-
-          val queueSeq = x.getQueueSeq
-          val queueEntry = entryIndex.remove(queueSeq)
-          if (queueEntry != null) {
-            val messageKey = queueEntry.getMessageKey
-            val existing = trackingIndex.remove(messageKey)
-            if (existing != null) {
-              if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
-                // message is no longer referenced.. we can remove it..
-                removeMessage(messageKey)
-              }
-            } else {
-              if( !recovering ) {
-                error("Tracking entry not found for message %d", queueEntry.getMessageKey)
-              }
-            }
+          if (pos == 0L ) {
+            None
           } else {
-            if( !recovering ) {
-              error("Queue entry not found for seq %d", x.getQueueSeq)
+            log.read(pos).map { case (prefix, data, _)=>
+              val rc:MessageRecord = data
+              rc.locator = new AtomicReference[Array[Byte]](pos_array)
+              rc
             }
           }
+        }
+        if( record.isDefined ) {
+          callback(record)
+          None
         } else {
-          if( !recovering ) {
-            error("Queue not found: %d", x.getQueueKey)
-          }
+          Some(x)
         }
+      }
+    }
 
-      case x: AddQueue.Getter =>
-        val queueKey = x.getKey
-        if (queueIndex.get(queueKey) == null) {
+    if (missing.isEmpty)
+      return
 
-          if (queueKey > rootBean.getLastQueueKey) {
-            rootBean.setLastQueueKey(queueKey)
+    // There's a small chance that a message was missing, perhaps we started a read tx, before the
+    // write tx completed.  Lets try again..
+    retry_using_index { index =>
+      missing.foreach { x =>
+        val (message_key, locator, callback) = x
+        val record = metric_load_from_index_counter.time {
+          index.get(encode(message_prefix, message_key)).flatMap{ pos_buffer=>
+            val pos = decode_long(pos_buffer)
+            log.read(pos).map { case (prefix, data, _)=>
+              val rc:MessageRecord = data
+              rc.locator = new AtomicReference[Array[Byte]](pos_buffer.toByteArray)
+              rc
+            }
           }
-
-          val queueRecord = new QueueRootRecord.Bean
-          queueRecord.setEntryIndexPage(alloc(QUEUE_ENTRY_INDEX_FACTORY))
-          queueRecord.setTrackingIndexPage(alloc(QUEUE_TRACKING_INDEX_FACTORY))
-          queueRecord.setInfo(x)
-          queueIndex.put(queueKey, queueRecord.freeze)
         }
+        callback(record)
+      }
+    }
+  }
 
-      case x: RemoveQueue.Getter =>
-        removeQueue(x.getKey)
+  def listQueues: Seq[Long] = {
+    val rc = ListBuffer[Long]()
+    retry_using_index { index =>
+      index.cursor_keys_prefixed(queue_prefix_array) { key =>
+        rc += decode_long_key(key)._2
+        true // to continue cursoring.
+      }
+    }
+    rc
+  }
 
-      case x: AddTrace.Getter =>
-        // trace messages are informational messages in the journal used to log
-        // historical info about store state.  They don't update the indexes.
+  def getQueue(queue_key: Long): Option[QueueRecord] = {
+    retry_using_index { index =>
+      index.get(encode(queue_prefix, queue_key)).map( x=> decode_queue_record_buffer(x)  )
+    }
+  }
 
-      case x: Purge.Getter =>
-        // Remove all the queues...
-        val queueKeys = ListBuffer[Long]()
-        queueIndex.iterator.foreach { entry =>
-          queueKeys += entry.getKey.longValue
-        }
+  def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = {
+    var rc = ListBuffer[QueueEntryRange]()
+    retry_using_index { index =>
+      var group:QueueEntryRange = null
+      index.cursor_prefixed( encode(queue_entry_prefix, queue_key)) { (key, value) =>
 
-        queueKeys.foreach { key =>
-          removeQueue(key)
+        val (_,_,current_key) = decode_long_long_key(key)
+        if( group == null ) {
+          group = new QueueEntryRange
+          group.first_entry_seq = current_key
         }
 
-        // Remove stored messages...
-        messageKeyIndex.clear
-        messageRefsIndex.clear
-        dataFileRefIndex.clear
-        rootBean.setLastMessageKey(0)
+        val entry:QueueEntryRecord = value
 
-        cleanup(_tx);
-        info("Store purged.");
+        group.last_entry_seq = current_key
+        group.count += 1
+        group.size += entry.size
 
-      case x: MapEntry.Getter =>
-        val value = x.getValue
-        if( value==null ) {
-          mapIndex.remove(x.getKey)
+        if(group.expiration == 0){
+          group.expiration = entry.expiration
         } else {
-          mapIndex.put(x.getKey, value)
+          if( entry.expiration != 0 ) {
+            group.expiration = entry.expiration.min(group.expiration)
+          }
         }
 
+        if( group.count == limit) {
+          rc += group
+          group = null
+        }
+
+        true // to continue cursoring.
+      }
+      if( group!=null ) {
+        rc += group
+      }
     }
+    rc
   }
 
+  def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+    var rc = ListBuffer[QueueEntryRecord]()
+    retry_using_index { index =>
+      val start = encode(queue_entry_prefix, queue_key, firstSeq)
+      val end = encode(queue_entry_prefix, queue_key, lastSeq+1)
+      index.cursor_range( start, end ) { (key, value) =>
+        rc += value
+        true
+      }
+    }
+    rc
+  }
 
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Periodic Maintance
-  //
-  /////////////////////////////////////////////////////////////////////
-
+  def getLastMessageKey:Long = {
+    retry_using_index { index =>
+      index.last_key(message_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
+    }
+  }
 
-  def flush() = {
-    val start = System.currentTimeMillis()
-    index_file.flush
-    val end = System.currentTimeMillis()
-    if (end - start > 1000) {
-      warn("Index flush latency: %,.3f seconds", ((end - start) / 1000.0f))
+  def get(key: Buffer):Option[Buffer] = {
+    retry_using_index { index =>
+      index.get(encode(map_prefix, key)).map(new Buffer(_))
     }
   }
 
-  def cleanup():Unit = withTx {tx =>
-    cleanup(tx)
+  def getLastQueueKey:Long = {
+    retry_using_index { index =>
+      index.last_key(queue_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
+    }
   }
 
-  /**
-   * @param tx
-   * @throws IOException
-   */
-  def cleanup(tx:Transaction):Unit = {
-    val helper = new TxHelper(tx)
-    import JavaConversions._
-    import helper._
+  def gc:Unit = {
+    var active_counter = 0
+    var delete_counter = 0
+    val latency_counter = new TimeCounter
 
-    debug("Cleanup started.")
-    val gcCandidateSet = new TreeSet[jl.Integer](journal.getFileMap().keySet())
+    //
+    // This journal_usage will let us get a picture of which queues are using how much of each
+    // log file.  It will help folks figure out why a log file is not getting deleted.
+    //
+    val journal_usage = new ApolloTreeMap[Long,(RecordLog#LogInfo , UsageCounter)]()
+    var append_journal = 0L
 
-    // Don't cleanup locked data files
-    if (lockedDatatFiles != null) {
-      gcCandidateSet.removeAll(lockedDatatFiles)
+    log.log_mutex.synchronized {
+      append_journal = log.log_infos.last._1
+      log.log_infos.foreach(entry=> journal_usage.put(entry._1, (entry._2, UsageCounter())) )
     }
 
-    // Don't GC files that we will need for recovery..
-
-    // Notice we are using the storedRootBuffer and not the rootBuffer field.
-    // rootBuffer has the latest updates, which they may not survive restart.
-    val upto = if (storedRootBuffer.hasFirstBatchLocation) {
-      Some(storedRootBuffer.getFirstBatchLocation.getDataFileId)
-    } else {
-      if (storedRootBuffer.hasLastUpdateLocation) {
-        Some(storedRootBuffer.getLastUpdateLocation.getDataFileId)
+    def find_journal(pos: Long) = {
+      var entry = journal_usage.floorEntry(pos)
+      if (entry != null) {
+        val (info, usageCounter) = entry.getValue()
+        if (pos < info.limit) {
+          Some(entry.getKey -> usageCounter)
+        } else {
+          None
+        }
       } else {
         None
       }
     }
 
-    upto match {
-      case Some(dataFile) =>
-        var done = false
-        while (!done && !gcCandidateSet.isEmpty()) {
-          val last = gcCandidateSet.last()
-          if (last.intValue >= dataFile) {
-            gcCandidateSet.remove(last)
+    in_gc = true
+    val now = System.currentTimeMillis()
+    debug(store.store_kind+" gc starting")
+    latency_counter.time {
+
+      retry_using_index { index =>
+        // Figure out which journal files are still in use by which queues.
+        index.cursor_prefixed(queue_entry_prefix_array) { (_,value) =>
+          val entry_record:QueueEntryRecord = value
+          val pos = if(entry_record.message_locator!=null) {
+            decode_long(entry_record.message_locator)
           } else {
-            done = true
+            index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_)).getOrElse(0L)
           }
+
+          find_journal(pos) match {
+            case Some((key,usageCounter)) =>
+              usageCounter.increment(entry_record.size)
+            case None =>
+          }
+
+          // only continue while the service is still running..
+          store.service_state.is_started
         }
 
-      case None =>
-    }
+        if (store.service_state.is_started) {
 
-    if (!gcCandidateSet.isEmpty() ) {
-      dataFileRefIndex.iterator.foreach { entry =>
-        gcCandidateSet.remove(entry.getKey)
-      }
-      if (!gcCandidateSet.isEmpty()) {
-        debug("Cleanup removing the data files: %s", gcCandidateSet)
-        journal.removeDataFiles(gcCandidateSet)
-      }
-    }
-    debug("Cleanup done.")
-  }
+          gc_detected_log_usage = Map((collection.JavaConversions.asScalaSet(journal_usage.entrySet()).map { x=>
+            x.getKey -> x.getValue._2
+          }).toSeq : _ * )
 
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Helper Methods / Classes
-  //
-  /////////////////////////////////////////////////////////////////////
+          // Take empty journals out of the map..
+          val empty_journals = ListBuffer[Long]()
 
-  private case class Update(update: TypeCreatable, location: Location)
+          val i = journal_usage.entrySet().iterator();
+          while( i.hasNext ) {
+            val (info, usageCounter) = i.next().getValue
+            if( usageCounter.count==0 && info.position < append_journal) {
+              empty_journals += info.position
+              i.remove()
+            }
+          }
 
-  private class TxHelper(val _tx: Transaction) {
-    lazy val mapIndex = MAP_INDEX_FACTORY.open(_tx, rootBuffer.getMapIndexPage)
-    lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, rootBuffer.getQueueIndexPage)
-    lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, rootBuffer.getDataFileRefIndexPage)
-    lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage)
-    lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, rootBuffer.getMessageRefsIndexPage)
-    lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, rootBuffer.getSubscriptionIndexPage)
-
-    def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
-      var counter = index.get(key)
-      if( counter == null ) {
-        if( amount!=0 ) {
-          index.put(key, amount)
-        }
-        amount
-      } else {
-        val update = counter.intValue + amount
-        if( update == 0 ) {
-          index.remove(key)
-        } else {
-          index.put(key, update)
+          index.cursor_prefixed(message_prefix_array) { (key,value) =>
+            val pos = decode_long(value)
+
+            if ( !find_journal(pos).isDefined ) {
+              // Delete it.
+              index.delete(key)
+              delete_counter += 1
+            } else {
+              active_counter += 1
+            }
+            // only continue while the service is still running..
+            store.service_state.is_started
+          }
+
+          if (store.service_state.is_started) {
+            // We don't want to delete any journals that the index has not snapshot'ed or
+            // the the
+            val delete_limit = find_journal(last_index_snapshot_pos).map(_._1).
+                  getOrElse(last_index_snapshot_pos).min(log.appender_start)
+
+            empty_journals.foreach { id =>
+              if ( id < delete_limit ) {
+                log.delete(id)
+              }
+            }
+          }
         }
-        update
       }
     }
+    last_gc_ts=now
+    last_gc_duration = latency_counter.total(TimeUnit.MILLISECONDS)
+    in_gc = false
+    debug(store.store_kind+" gc ended")
+  }
 
-    def queueEntryIndex(root: QueueRootRecord.Getter) = QUEUE_ENTRY_INDEX_FACTORY.open(_tx, root.getEntryIndexPage)
 
-    def queueTrackingIndex(root: QueueRootRecord.Getter) = QUEUE_TRACKING_INDEX_FACTORY.open(_tx, root.getTrackingIndexPage)
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+    try {
+      retry_using_index { index =>
 
-    def alloc(factory: IndexFactory[_, _]) = factory.create(_tx).getIndexLocation
+        def write_framed(stream:OutputStream, value:Buffer) = {
+          val helper = new AbstractVarIntSupport {
+            def readByte: Byte = throw new UnsupportedOperationException
+            def writeByte(value: Int) = stream.write(value)
+          }
+          helper.writeVarInt(value.length)
+          value.writeTo(stream)
+          true
+        }
+
+        streams.using_map_stream { stream=>
+          index.cursor_prefixed(map_prefix_array) { (key, value) =>
+            val key_buffer = new Buffer(key)
+            key_buffer.moveHead(1)
+            val record = new MapEntryPB.Bean
+            record.setKey(key_buffer)
+            record.setValue(new Buffer(value))
+            record.freeze().writeFramed(stream)
+            true
+          }
+        }
 
-    val rootBean = rootBuffer.copy
+        streams.using_queue_stream { stream =>
+          index.cursor_prefixed(queue_prefix_array) { (_, value) =>
+            write_framed(stream, value)
+          }
+        }
 
-    def lastUpdateLocation(location:Location) = {
-      rootBean.setLastUpdateLocation(location)
-    }
+        streams.using_message_stream { stream=>
+          index.cursor_prefixed(message_prefix_array) { (_, value) =>
+            write_framed(stream, value)
+          }
+        }
 
-    def updateLocations(lastUpdate: Location): Unit = {
-      rootBean.setLastUpdateLocation(lastUpdate)
-      if (batches.isEmpty) {
-        rootBean.clearFirstBatchLocation
-      } else {
-        rootBean.setFirstBatchLocation(batches.head._2._1)
+        streams.using_queue_entry_stream { stream=>
+          index.cursor_prefixed(queue_entry_prefix_array) { (_, value) =>
+            write_framed(stream, value)
+          }
+        }
       }
+      Success(Zilch)
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
     }
+  }
 
-    def storeRootBean() = {
-      val frozen = rootBean.freeze
-      rootBuffer = frozen
-      _tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+    try {
+      purge
 
-      // Since the index flushes updates async, hook a callback to know when
-      // the update has hit disk.  storedRootBuffer is used by the
-      // cleanup task to know when which data logs are safe to cleanup.
-      _tx.onFlush(^{
-        storedRootBuffer = frozen
-      })
+      retry_using_index { index =>
+        def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
+          var done = false
+          do {
+            try {
+              func(fact.parseFramed(stream).asInstanceOf[Buffer])
+            } catch {
+              case x:EOFException =>
+                done = true
+            }
+          } while( !done )
+        }
 
-    }
+        log.appender { appender =>
+          streams.using_map_stream { stream=>
+            foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
+              index.put(encode(map_prefix, pb.getKey), pb.getValue)
+            }
+          }
 
-  }
+          streams.using_queue_stream { stream=>
+            foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=>
+              index.put(encode(queue_prefix, record.key), record.toUnframedBuffer)
+            }
+          }
+
+          streams.using_message_stream { stream=>
+            foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=>
+              val pos = appender.append(LOG_ADD_MESSAGE, record.toUnframedBuffer)
+              index.put(encode(message_prefix, record.key), encode_long(pos))
+            }
+          }
+
+          streams.using_queue_entry_stream { stream=>
+            foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
+              index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), record.toUnframedBuffer)
+            }
+          }
+        }
 
-  private def withTx[T](func: (Transaction) => T): T = {
-    val tx = index_file.tx
-    var ok = false
-    try {
-      val rc = func(tx)
-      ok = true
-      rc
-    } finally {
-      if (ok) {
-        tx.commit
-      } else {
-        tx.rollback
       }
-      tx.close
-    }
-  }
+      snapshot_index
+      Success(Zilch)
 
-  // Gets the next batch id.. after a while we may wrap around
-  // start producing batch ids from zero
-  val next_batch_id = {
-    var rc = next_batch_counter.getAndIncrement
-    while (rc < 0) {
-      // We just wrapped around.. reset the counter to 0
-      // Use a CAS operation so that only 1 thread resets the counter
-      next_batch_counter.compareAndSet(rc + 1, 0)
-      rc = next_batch_counter.getAndIncrement
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
     }
-    rc
   }
-
-  private def isAfterLastUpdateLocation(location: Location) = {
-    val lastUpdate: Location = rootBuffer.getLastUpdateLocation
-    lastUpdate.compareTo(location) < 0
-  }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1162574&r1=1162573&r2=1162574&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Sun Aug 28 19:13:04 2011
@@ -16,20 +16,23 @@
  */
 package org.apache.activemq.apollo.broker.store.hawtdb
 
-import dto.{HawtDBStoreStatusDTO, HawtDBStoreDTO}
+import dto.{HawtDBStoreDTO, HawtDBStoreStatusDTO}
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
-import atomic.{AtomicReference, AtomicInteger, AtomicLong}
-import org.apache.activemq.apollo.dto._
+import atomic.{AtomicReference, AtomicLong}
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.ListEventAggregator
+import org.apache.activemq.apollo.dto.StoreStatusDTO
 import org.apache.activemq.apollo.util.OptionSupport._
-import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
+import java.io._
+import org.apache.activemq.apollo.web.resources.ViewHelper
 import org.fusesource.hawtbuf.Buffer
-
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 }
@@ -37,26 +40,31 @@ object HawtDBStore extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBStore(var config:HawtDBStoreDTO) extends DelayingStoreSupport {
+class HawtDBStore(val config:HawtDBStoreDTO) extends DelayingStoreSupport {
 
   var next_queue_key = new AtomicLong(1)
   var next_msg_key = new AtomicLong(1)
 
-  var executor_pool:ExecutorService = _
-  val schedule_version = new AtomicInteger()
-  val client = new HawtDBClient(this)
+  var write_executor:ExecutorService = _
+  var gc_executor:ExecutorService = _
+  var read_executor:ExecutorService = _
+
+  var client:HawtDBClient = _
+  def create_client = new HawtDBClient(this)
 
-  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatch_queue)
-  load_source.setEventHandler(^{drain_loads});
 
-  override def toString = "hawtdb store at "+config.directory
+  def store_kind = "hawtdb"
+
+  override def toString = store_kind+" store at "+config.directory
 
   def flush_delay = config.flush_delay.getOrElse(100)
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
+  override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
+
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
-    executor_pool {
+    write_executor {
       client.store(uows, ^{
         dispatch_queue {
           callback
@@ -66,66 +74,90 @@ class HawtDBStore(var config:HawtDBStore
   }
 
   protected def _start(on_completed: Runnable) = {
-    executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
-      def newThread(r: Runnable) = {
-        val rc = new Thread(r, "hawtdb store client")
-        rc.setDaemon(true)
-        rc
-      }
-    })
-    client.config = config
-    poll_stats
-    executor_pool {
-      client.start(^{
-        next_msg_key.set( client.rootBuffer.getLastMessageKey.longValue +1 )
-        next_queue_key.set( client.rootBuffer.getLastQueueKey.longValue +1 )
-        val v = schedule_version.incrementAndGet
-        scheduleCleanup(v)
-        scheduleFlush(v)
-        load_source.resume
-        on_completed.run
+    try {
+      client = create_client
+      write_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+        def newThread(r: Runnable) = {
+          val rc = new Thread(r, store_kind + " store io write")
+          rc.setDaemon(true)
+          rc
+        }
       })
-    }
-  }
-
-  def scheduleFlush(version:Int): Unit = {
-    def try_flush() = {
-      if (version == schedule_version.get) {
-        executor_pool {
-          client.flush
-          scheduleFlush(version)
+      gc_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+        def newThread(r: Runnable) = {
+          val rc = new Thread(r, store_kind + " store gc")
+          rc.setDaemon(true)
+          rc
         }
-      }
-    }
-    dispatch_queue.executeAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
-  }
-
-  def scheduleCleanup(version:Int): Unit = {
-    def try_cleanup() = {
-      if (version == schedule_version.get) {
-        executor_pool {
-          client.cleanup()
-          scheduleCleanup(version)
+      })
+      read_executor = Executors.newFixedThreadPool(config.read_threads.getOrElse(10), new ThreadFactory() {
+        def newThread(r: Runnable) = {
+          val rc = new Thread(r, store_kind + " store io read")
+          rc.setDaemon(true)
+          rc
+        }
+      })
+      poll_stats
+      write_executor {
+        try {
+          client.start()
+          next_msg_key.set(client.getLastMessageKey + 1)
+          next_queue_key.set(client.getLastQueueKey + 1)
+          poll_gc
+          on_completed.run
+        } catch {
+          case e:Throwable =>
+            e.printStackTrace()
+            HawtDBStore.error(e, "Store client startup failure: "+e)
         }
       }
     }
-    dispatch_queue.executeAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+    catch {
+      case e:Throwable =>
+        e.printStackTrace()
+        HawtDBStore.error(e, "Store startup failure: "+e)
+    }
   }
 
   protected def _stop(on_completed: Runnable) = {
-    schedule_version.incrementAndGet
     new Thread() {
       override def run = {
-        load_source.suspend
-        executor_pool.shutdown
-        executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
-        executor_pool = null
+        write_executor.shutdown
+        write_executor.awaitTermination(60, TimeUnit.SECONDS)
+        write_executor = null
+        read_executor.shutdown
+        read_executor.awaitTermination(60, TimeUnit.SECONDS)
+        read_executor = null
+        gc_executor.shutdown
         client.stop
         on_completed.run
       }
     }.start
   }
 
+  private def keep_polling = {
+    val ss = service_state
+    ss.is_starting || ss.is_started
+  }
+
+  def poll_gc:Unit = {
+    val interval = config.gc_interval.getOrElse(60*30)
+    if( interval>0 ) {
+      dispatch_queue.after(interval, TimeUnit.SECONDS) {
+        if( keep_polling ) {
+          gc {
+            poll_gc
+          }
+        }
+      }
+    }
+  }
+
+  def gc(onComplete: =>Unit) = gc_executor {
+    client.gc
+    onComplete
+  }
+
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the Store interface
@@ -136,17 +168,17 @@ class HawtDBStore(var config:HawtDBStore
    * Deletes all stored data from the store.
    */
   def purge(callback: =>Unit) = {
-    executor_pool {
-      client.purge(^{
-        next_queue_key.set(1)
-        next_msg_key.set(1)
-        callback
-      })
+    write_executor {
+      client.purge()
+      next_queue_key.set(1)
+      next_msg_key.set(1)
+      callback
     }
   }
 
+
   def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
-    executor_pool {
+    read_executor {
       callback(client.get(key))
     }
   }
@@ -155,38 +187,43 @@ class HawtDBStore(var config:HawtDBStore
    * Ges the last queue key identifier stored.
    */
   def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
-    executor_pool {
-      callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+    write_executor {
+      callback(Some(client.getLastQueueKey))
     }
   }
 
   def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
-    executor_pool {
+    write_executor {
      client.addQueue(record, ^{ callback(true) })
     }
   }
 
   def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
-    executor_pool {
+    write_executor {
       client.removeQueue(queueKey,^{ callback(true) })
     }
   }
 
   def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
-    executor_pool {
+    write_executor {
       callback( client.getQueue(queueKey) )
     }
   }
 
   def list_queues(callback: (Seq[Long]) => Unit) = {
-    executor_pool {
+    write_executor {
       callback( client.listQueues )
     }
   }
 
+  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)](), dispatch_queue)
+  load_source.setEventHandler(^{drain_loads});
+  load_source.resume
+
+
   def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
-      load_source.merge((messageKey, { (result)=>
+      load_source.merge((messageKey, locator, { (result)=>
         end()
         callback(result)
       }))
@@ -196,19 +233,19 @@ class HawtDBStore(var config:HawtDBStore
   def drain_loads = {
     var data = load_source.getData
     message_load_batch_size_counter += data.size
-    executor_pool ^{
+    read_executor ^{
       client.loadMessages(data)
     }
   }
 
   def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
-    executor_pool ^{
+    write_executor ^{
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }
   }
 
   def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
-    executor_pool ^{
+    write_executor ^{
       callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
     }
   }
@@ -219,8 +256,8 @@ class HawtDBStore(var config:HawtDBStore
 
         flush_latency = flush_latency_counter(true)
         message_load_latency = message_load_latency_counter(true)
-        client.metric_journal_append = client.metric_journal_append_counter(true)
-        client.metric_index_update = client.metric_index_update_counter(true)
+//        client.metric_journal_append = client.metric_journal_append_counter(true)
+//        client.metric_index_update = client.metric_index_update_counter(true)
         commit_latency = commit_latency_counter(true)
         message_load_batch_size =  message_load_batch_size_counter(true)
 
@@ -233,38 +270,54 @@ class HawtDBStore(var config:HawtDBStore
 
   def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
     val rc = new HawtDBStoreStatusDTO
-
-    rc.state = service_state.toString
-    rc.state_since = service_state.since
-
-    rc.flush_latency = flush_latency
-    rc.message_load_latency = message_load_latency
+    fill_store_status(rc)
     rc.message_load_batch_size = message_load_batch_size
-
-    rc.journal_append_latency = client.metric_journal_append
-    rc.index_update_latency = client.metric_index_update
-
-    rc.canceled_message_counter = metric_canceled_message_counter
-    rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
-    rc.flushed_message_counter = metric_flushed_message_counter
-    rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
-
-    callback(rc)
+    write_executor {
+      client.using_index { index =>
+        rc.log_append_pos = client.log.appender_limit
+        rc.index_snapshot_pos = client.last_index_snapshot_pos
+        rc.last_gc_duration = client.last_gc_duration
+        rc.last_gc_ts = client.last_gc_ts
+        rc.in_gc = client.in_gc
+        rc.log_stats = {
+          var row_layout = "%-20s | %-10s | %10s/%-10s\n"
+          row_layout.format("File", "Messages", "Used Size", "Total Size")+
+          client.log.log_infos.map(x=> x._1 -> client.gc_detected_log_usage.get(x._1)).toSeq.flatMap { x=>
+            try {
+              val file = HawtDBClient.create_sequence_file(client.directory, x._1, HawtDBClient.LOG_SUFFIX)
+              val size = file.length()
+              val usage = x._2 match {
+                case Some(usage)=>
+                  (usage.count.toString, ViewHelper.memory(usage.size))
+                case None=>
+                  ("unknown", "unknown")
+              }
+              Some(row_layout.format(file.getName, usage._1, usage._2, ViewHelper.memory(size)))
+            } catch {
+              case e:Throwable =>
+                None
+            }
+          }.mkString("")
+        }
+      }
+      callback(rc)
+    }
   }
 
   /**
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = executor_pool ! {
-    Failure("not supported")// client.export_pb(queue_stream, message_stream, queue_entry_stream)
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = write_executor ! {
+    client.export_pb(streams)
   }
 
   /**
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = executor_pool ! {
-    Failure("not supported")//client.import_pb(queue_stream, message_stream, queue_entry_stream)
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor ! {
+    client.import_pb(streams)
   }
+
 }



Mime
View raw message