activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961122 [1/3] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/resources/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/ activem...
Date Wed, 07 Jul 2010 04:03:36 GMT
Author: chirino
Date: Wed Jul  7 04:03:34 2010
New Revision: 961122

URL: http://svn.apache.org/viewvc?rev=961122&view=rev
Log:
working on the hawtdb module

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/DestinationEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Marshallers.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/MessageKeys.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueStatus.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/SubscriptionRecord.java
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStorePerformance.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryStore.scala
      - copied, changed from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/DestinationEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMessageDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Marshallers.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/MessageKeys.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStorePerformance.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/non-persistent-activemq.xml
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/persistent-activemq.xml
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testUris.xml
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/activemq.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/ft/master.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/ft/sharedFileMaster.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/ft/sharedFileSlave.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/ft/slave.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/ft/slave2.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/store/kahabroker.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/broker/store/loadtester.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/config/config.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/config/example.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/config/spring-test.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/security/ldap-spring.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/stomp-auth-broker.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/tcp/activemq-ssl.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/usecases/receiver-activecluster.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/usecases/receiver-zeroconf.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/usecases/receiver.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/usecases/sender-activecluster.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/usecases/sender-zeroconf.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/usecases/sender.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/xbean/activemq2.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring-embedded-pooled.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring-embedded-xbean-noversion.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring-embedded-xbean.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring-embedded.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring-jndi.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring-queue.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/spring.xml   (contents, props changed)
    activemq/sandbox/activemq-apollo-actor/sandbox/hawtdb/pom.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/kahadb/README

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/non-persistent-activemq.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/non-persistent-activemq.xml?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/non-persistent-activemq.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/non-persistent-activemq.xml Wed Jul  7 04:03:34 2010
@@ -20,4 +20,4 @@
     <host-name>default</host-name>
     <memory-store/>    
   </virtual-host>
-</broker>
\ No newline at end of file
+</broker>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/persistent-activemq.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/persistent-activemq.xml?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/persistent-activemq.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/persistent-activemq.xml Wed Jul  7 04:03:34 2010
@@ -20,4 +20,4 @@
     <host-name>default</host-name>
     <kahadb-store purge-on-startup="true" directory="${basedir}/target/test-data/kahadb"/>
   </virtual-host>
-</broker>
\ No newline at end of file
+</broker>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:03:34 2010
@@ -23,8 +23,9 @@ import org.fusesource.hawtdispatch.{Scal
 import org.apache.activemq.util.TreeMap.TreeEntry
 import java.util.{Collections, ArrayList, LinkedList}
 import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
-import org.apache.activemq.broker.store.{StoredMessage, StoreTransaction}
+import org.apache.activemq.broker.store.{StoreTransaction}
 import protocol.ProtocolFactory
+import org.apache.activemq.apollo.store.MessageRecord
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -78,7 +79,7 @@ class Queue(val host: VirtualHost, val d
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val store_load_source = createSource(new ListEventAggregator[(QueueEntry, StoredMessage)](), dispatchQueue)
+  val store_load_source = createSource(new ListEventAggregator[(QueueEntry, MessageRecord)](), dispatchQueue)
   store_load_source.setEventHandler(^ {drain_store_loads});
   store_load_source.resume
 
@@ -378,7 +379,7 @@ class Queue(val host: VirtualHost, val d
             val tx = host.database.createStoreTransaction
 
             val message = loaded.delivery.message
-            val sm = new StoredMessage
+            val sm = new MessageRecord
             sm.protocol = message.protocol
             sm.value = ProtocolFactory.get(message.protocol).encode(message)
             sm.size = loaded.size

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala Wed Jul  7 04:03:34 2010
@@ -735,10 +735,10 @@ abstract class TransactionX {
 //         */
 //        public final MessageRecord createMessageRecord() {
 //            MessageRecord ret = new MessageRecord();
-//            ret.setEncoding(TxAck.ENCODING);
-//            ret.setKey(storeTracking);
+//            ret.setProtocol(TxAck.ENCODING);
+//            ret.setId(storeTracking);
 //            ret.setSize(MEM_SIZE);
-//            ret.setBuffer(new Buffer(toBytes().getData()));
+//            ret.setValue(new Buffer(toBytes().getData()));
 //            return null;
 //        }
 //
@@ -766,8 +766,8 @@ abstract class TransactionX {
 //        }
 //
 //        public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
-//            TxAck ret = new TxAck(null, -1, record.getKey(), tx);
-//            ret.fromBytes(record.getBuffer().getData());
+//            TxAck ret = new TxAck(null, -1, record.getId(), tx);
+//            ret.fromBytes(record.getValue().getData());
 //            return ret;
 //        }
 //
@@ -791,7 +791,7 @@ abstract class TransactionX {
 //     * @return
 //     */
 //    public static TxOp createTxOp(MessageRecord record, Transaction tx) {
-//        if (record.getEncoding().equals(TxAck.ENCODING)) {
+//        if (record.getProtocol().equals(TxAck.ENCODING)) {
 //            return TxAck.createFromMessageRecord(record, tx);
 //        } else {
 //            MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:03:34 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.broker;
 
 import _root_.java.util.{ArrayList, HashMap}
-import _root_.org.apache.activemq.broker.store.memory.MemoryStore
 import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
@@ -29,9 +28,10 @@ import org.apache.activemq.apollo.dto.Vi
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import ReporterLevel._
-import org.apache.activemq.apollo.store.memory.MemoryBrokerDatabase
-import org.apache.activemq.broker.store.{StoredQueue, BrokerDatabase, Store}
+import org.apache.activemq.apollo.store.memory.MemoryStore
+import org.apache.activemq.broker.store.{Store}
 import org.fusesource.hawtbuf.proto.WireFormat
+import org.apache.activemq.apollo.store.QueueRecord
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -88,7 +88,7 @@ class VirtualHost(val broker: Broker) ex
     this.names = names.toList
   }
 
-  var database:BrokerDatabase = new MemoryBrokerDatabase()
+  var database:Store = new MemoryStore()
   var transactionManager:TransactionManagerX = new TransactionManagerX
 
   var protocols = Map[AsciiBuffer, WireFormat]()
@@ -117,11 +117,11 @@ class VirtualHost(val broker: Broker) ex
 
     database.listQueues { ids =>
       for( id <- ids) {
-        database.getQueueInfo(id) { x =>
+        database.getQueueStatus(id) { x =>
           x match {
             case Some(info)=>
             dispatchQueue ^{
-              val dest = DestinationParser.parse(info.name, destination_parser_options)
+              val dest = DestinationParser.parse(info.record.name , destination_parser_options)
               if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
 
                 val queue = new Queue(this, dest, id)
@@ -130,7 +130,7 @@ class VirtualHost(val broker: Broker) ex
                 queue.message_seq_counter = info.last+1
                 queue.count = info.count
 
-                queues.put(info.name, queue)
+                queues.put(info.record.name, queue)
               }
             }
             case _ =>
@@ -184,7 +184,7 @@ class VirtualHost(val broker: Broker) ex
 
   def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
     val name = DestinationParser.toBuffer(dest, destination_parser_options)
-    val record = new StoredQueue
+    val record = new QueueRecord
     record.name = name
     database.addQueue(record) { rc =>
       rc match {
@@ -1341,14 +1341,14 @@ class VirtualHost(val broker: Broker) ex
 //                            throw new RuntimeException("Error creating message record for " + brokerDelivery.getMsgId());
 //                        }
 //                    }
-//                    record.setKey(brokerDelivery.getStoreTracking());
+//                    record.setId(brokerDelivery.getStoreTracking());
 //                    session.messageAdd(record);
 //
 //                    for (SaveableQueueElement<MessageDelivery> target : targets) {
 //                        try {
 //                            QueueRecord queueRecord = new QueueRecord();
 //                            queueRecord.setAttachment(null);
-//                            queueRecord.setMessageKey(record.getKey());
+//                            queueRecord.setMessageKey(record.getId());
 //                            queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
 //                            queueRecord.setQueueKey(target.getSequenceNumber());
 //                            session.queueAddMessage(target.getQueueDescriptor(), queueRecord);
@@ -1375,7 +1375,7 @@ class VirtualHost(val broker: Broker) ex
 //                try {
 //                    QueueRecord queueRecord = new QueueRecord();
 //                    queueRecord.setAttachment(null);
-//                    queueRecord.setMessageKey(record.getKey());
+//                    queueRecord.setMessageKey(record.getId());
 //                    queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
 //                    queueRecord.setQueueKey(singleElement.getSequenceNumber());
 //                    session.queueAddMessage(singleElement.getQueueDescriptor(), queueRecord);
@@ -1564,7 +1564,7 @@ class VirtualHost(val broker: Broker) ex
 //            try {
 //                QueueRecord queueRecord = new QueueRecord();
 //                queueRecord.setAttachment(null);
-//                queueRecord.setMessageKey(record.getKey());
+//                queueRecord.setMessageKey(record.getId());
 //                queueRecord.setSize(record.getSize());
 //                queueRecord.setQueueKey(op.getSequenceNumber());
 //                session.queueAddMessage(op.getQueueDescriptor(), queueRecord);
@@ -1574,7 +1574,7 @@ class VirtualHost(val broker: Broker) ex
 //        }
 //
 //        public String toString() {
-//            return "AddTxOpOperation " + record.getKey() + super.toString();
+//            return "AddTxOpOperation " + record.getId() + super.toString();
 //        }
 //    }
 //
@@ -1653,13 +1653,13 @@ class BrokerQueueStore { // implements Q
 //         * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
 //         */
 //        public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
-//            ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
+//            ProtocolHandler handler = protocolHandlers.get(record.getProtocol().toString());
 //            if (handler == null) {
 //                try {
-//                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
-//                    protocolHandlers.put(record.getEncoding().toString(), handler);
+//                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getProtocol().toString());
+//                    protocolHandlers.put(record.getProtocol().toString(), handler);
 //                } catch (Throwable thrown) {
-//                    throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
+//                    throw new RuntimeException("Unknown message format" + record.getProtocol().toString(), thrown);
 //                }
 //            }
 //            try {
@@ -2060,7 +2060,7 @@ class BrokerQueueStore { // implements Q
 //        database.deleteQueue(queue);
 //    }
 
-    def setDatabase(database:BrokerDatabase ) = {
+    def setDatabase(database:Store ) = {
     }
 
     def setDispatchQueue(dispatchQueue:DispatchQueue )= {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml Wed Jul  7 04:03:34 2010
@@ -29,4 +29,4 @@
     <memory-store/>
   </virtual-host>
   
-</broker>
\ No newline at end of file
+</broker>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testUris.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testUris.xml?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testUris.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testUris.xml Wed Jul  7 04:03:34 2010
@@ -16,4 +16,4 @@
     limitations under the License.
 -->
 <broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
-</broker>
\ No newline at end of file
+</broker>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 04:03:34 2010
@@ -25,7 +25,7 @@ import org.apache.activemq.transport.Tra
 
 import _root_.scala.collection.JavaConversions._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.broker.store.{Store, StoreFactory}
+import org.apache.activemq.broker.store.Store
 import java.util.ArrayList
 import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.{CountDownLatch, TimeUnit}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml Wed Jul  7 04:03:34 2010
@@ -1,18 +1,18 @@
 <?xml version="1.0" encoding="UTF-8"?>
-  <!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements. See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version
-    2.0 (the "License"); you may not use this file except in compliance
-    with the License. You may obtain a copy of the License at
-    http://www.apache.org/licenses/LICENSE-2.0 Unless required by
-    applicable law or agreed to in writing, software distributed under
-    the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
-    OR CONDITIONS OF ANY KIND, either express or implied. See the
-    License for the specific language governing permissions and
-    limitations under the License.
-  -->
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version
+  2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+  applicable law or agreed to in writing, software distributed under
+  the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+  OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and
+  limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
@@ -51,6 +51,12 @@
       <version>${hawtbuf-version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-dto</artifactId>
+      <version>6.0-SNAPSHOT</version>
+    </dependency>
+
     <!-- Scala Support -->
     <dependency>
       <groupId>org.scala-lang</groupId>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto?rev=961122&r1=961121&r2=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto Wed Jul  7 04:03:34 2010
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 //
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.broker.store.hawtdb.store;
 
 option java_multiple_files = false;
 option java_outer_classname = "Data";
@@ -56,8 +56,8 @@ message Trace {
 message MessageAdd {
   optional int64 messageKey=1;
   optional bytes messageId = 2 [java_override_type = "AsciiBuffer"];
-  optional bytes encoding = 3 [java_override_type = "AsciiBuffer"];
-  optional bytes buffer = 4;
+  optional bytes protocol = 3 [java_override_type = "AsciiBuffer"];
+  optional bytes value = 4;
   optional int64 streamKey=5;
   optional int32 messageSize=6;
 }  
@@ -67,18 +67,16 @@ message MessageAdd {
 ///////////////////////////////////////////////////////////////
 
 message QueueAdd {
-  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
-  optional int32 applicationType = 2; 
-  optional bytes parentName = 3 [java_override_type = "AsciiBuffer"];
-  optional int32 queueType = 4;
-  optional int32 partitionId = 5;
-}  
+  optional int64 key=1;
+  optional bytes name = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes queueType = 3 [java_override_type = "AsciiBuffer"];
+}
 message QueueRemove {
-  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
-}  
+  optional int64 key=5;
+}
 message QueueAddMessage {
-  optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
-  optional int64 queueKey=2;
+  optional int64 queueKey=1;
+  optional int64 queueSeq=2;
   optional int64 messageKey=3;
   optional bytes attachment = 4;
   optional int32 messageSize=5;
@@ -118,12 +116,12 @@ message MapRemove {
 }  
 message MapEntryPut {
   optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-  optional bytes key = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes id = 2 [java_override_type = "AsciiBuffer"];
   optional bytes value = 3;
 } 
 message MapEntryRemove {
   optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-  optional bytes key = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes id = 2 [java_override_type = "AsciiBuffer"];
 }
 
 ///////////////////////////////////////////////////////////////

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:03:34 2010
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import collection.Seq
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.BaseRetained
+import java.io.{IOException, File}
+import org.apache.activemq.util.LockFile
+import org.fusesource.hawtdb.internal.journal.{Location, Journal}
+import java.util.HashSet
+import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{Executors, TimeUnit}
+import org.apache.activemq.apollo.dto.HawtDBStoreDTO
+import org.apache.activemq.apollo.broker._
+import ReporterLevel._
+import store.HawtDBManager
+import org.apache.activemq.broker.store.{Store, StoreTransaction}
+import org.apache.activemq.apollo.store.{QueueStatus, MessageRecord, QueueRecord}
+
+object HawtDBStore extends Log {
+  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+  /**
+   * Creates a default a configuration object.
+   */
+  def default() = {
+    val rc = new HawtDBStoreDTO
+    rc.directory = new File("activemq-data")
+    rc
+  }
+
+  /**
+   * Validates a configuration object.
+   */
+  def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
+     new Reporting(reporter) {
+      if( config.directory == null ) {
+        error("hawtdb store must be configured with a directroy.")
+      }
+    }.result
+  }}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HawtDBStore extends BaseService with Logging with Store {
+  import HawtDBStore._
+  override protected def log = HawtDBStore
+
+  val dispatchQueue = createQueue("hawtdb message database")
+  val writeQueue = Executors.newSingleThreadExecutor
+  val readQueue = Executors.newCachedThreadPool
+  var config: HawtDBStoreDTO  = default
+  var manager:HawtDBManager = null
+
+  /**
+   * Validates and then applies the configuration.
+   */
+  def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
+    if ( validate(config, reporter) < ERROR ) {
+      this.config = config
+      if( serviceState.isStarted ) {
+        // TODO: apply changes while he broker is running.
+        reporter.report(WARN, "Updating the hawtdb configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
+      }
+    }
+  } |>>: dispatchQueue
+
+  protected def _start(onCompleted: Runnable) = {
+    writeQueue {
+      manager = new HawtDBManager
+      manager.setStoreDirectory(config.directory)
+      manager.start()
+      onCompleted.run
+    }
+  }
+
+  protected def _stop(onCompleted: Runnable) = {
+    writeQueue {
+      manager.stop()
+      onCompleted.run
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the BrokerDatabase interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {}
+
+  def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {}
+
+  def listQueues(cb: (Seq[Long]) => Unit) = {}
+
+  def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {}
+
+  def flushMessage(id: Long)(cb: => Unit) = {}
+
+  def createStoreTransaction() = new HawtDBStoreTransaction
+
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the StoreTransaction interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  class HawtDBStoreTransaction extends BaseRetained with StoreTransaction {
+
+    def store(delivery: MessageRecord) = {
+
+    }
+
+    def enqueue(queue: Long, seq: Long, msg: Long) = {}
+
+    def dequeue(queue: Long, seq: Long, msg: Long) = {}
+
+  }
+
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java Wed Jul  7 04:03:34 2010
@@ -0,0 +1,19 @@
+package org.apache.activemq.broker.store.hawtdb.store;
+
+/**
+ * This interface is used to execute transacted code.
+ *
+ */
+public interface Callback<R, T extends Exception> {
+
+    /**
+     * @param session
+     *            provides you access to read and update the persistent
+     *            data.
+     * @return the result of the Callback
+     * @throws T
+     *             if an system error occured while executing the
+     *             operations.
+     */
+    public R execute(HawtDBSession session) throws T;
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/DestinationEntity.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/DestinationEntity.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java Wed Jul  7 04:03:34 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.broker.store.hawtdb.store;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -27,11 +27,9 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.activemq.broker.store.QueueDescriptor;
-import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.DuplicateKeyException;
-import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueAddMessage;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueEntryRecord;
+import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAddMessage;
 import org.fusesource.hawtdb.api.BTreeIndexFactory;
 import org.fusesource.hawtdb.api.SortedIndex;
 import org.fusesource.hawtdb.api.Transaction;
@@ -39,7 +37,7 @@ import org.fusesource.hawtdb.util.marsha
 
 public class DestinationEntity {
 
-    private static final BTreeIndexFactory<Long, QueueRecord> queueIndexFactory = new BTreeIndexFactory<Long, QueueRecord>();
+    private static final BTreeIndexFactory<Long, QueueEntryRecord> queueIndexFactory = new BTreeIndexFactory<Long, QueueEntryRecord>();
     private static final BTreeIndexFactory<Long, Long> trackingIndexFactory = new BTreeIndexFactory<Long, Long>();
     private static final BTreeIndexFactory<Long, Long> statsIndexFactory = new BTreeIndexFactory<Long, Long>();
 
@@ -63,14 +61,14 @@ public class DestinationEntity {
             DestinationEntity value = new DestinationEntity();
             value.queueIndex = dataIn.readInt();
             value.trackingIndex =  dataIn.readInt();
-            value.descriptor = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
+            value.record = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
             return value;
         }
 
         public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
             dataOut.writeInt(value.queueIndex);
             dataOut.writeInt(value.trackingIndex);
-            Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.descriptor, dataOut);
+            Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.record, dataOut);
         }
 
         public int estimatedSize(DestinationEntity object) {
@@ -88,7 +86,7 @@ public class DestinationEntity {
     private int statsIndex;
 
     // Descriptor for this queue:
-    private QueueDescriptor descriptor;
+    private QueueRecord record;
 
     // Child Partitions:
     private HashSet<DestinationEntity> partitions;
@@ -119,7 +117,7 @@ public class DestinationEntity {
         tx.free(statsIndex);
     }
 
-    private SortedIndex<Long, QueueRecord> queueIndex(Transaction tx) {
+    private SortedIndex<Long, QueueEntryRecord> queueIndex(Transaction tx) {
         return queueIndexFactory.open(tx, queueIndex);
     }
     private SortedIndex<Long, Long> trackingIndex(Transaction tx) {
@@ -149,29 +147,29 @@ public class DestinationEntity {
     }
 
     public long getFirstSequence(Transaction tx) throws IOException {
-        Entry<Long, QueueRecord> entry = queueIndex(tx).getFirst();
+        Entry<Long, QueueEntryRecord> entry = queueIndex(tx).getFirst();
         if( entry!=null ) {
-            return entry.getValue().getQueueKey();
+            return entry.getValue().queueKey;
         } else {
             return 0;
         }
     }
 
     public long getLastSequence(Transaction tx) throws IOException {
-        Entry<Long, QueueRecord> entry = queueIndex(tx).getLast();
+        Entry<Long, QueueEntryRecord> entry = queueIndex(tx).getLast();
         if( entry!=null ) {
-            return entry.getValue().getQueueKey();
+            return entry.getValue().queueKey;
         } else {
             return 0;
         }
     }
 
-    public void setQueueDescriptor(QueueDescriptor queue) {
-        descriptor = queue;
+    public void setQueueDescriptor(QueueRecord queue) {
+        record = queue;
     }
 
-    public QueueDescriptor getDescriptor() {
-        return descriptor;
+    public QueueRecord getQueueRecord() {
+        return record;
     }
 
     public void addPartition(DestinationEntity destination) {
@@ -205,13 +203,13 @@ public class DestinationEntity {
 
         Long existing = trackingIndex(tx).put(command.getMessageKey(), command.getQueueKey());
         if (existing == null) {
-            QueueRecord value = new QueueRecord();
-            value.setAttachment(command.getAttachment());
-            value.setMessageKey(command.getMessageKey());
-            value.setQueueKey(command.getQueueKey());
-            value.setSize(command.getMessageSize());
+            QueueEntryRecord value = new QueueEntryRecord();
+            value.attachment = command.getAttachment();
+            value.messageKey = command.getMessageKey();
+            value.queueKey = command.getQueueKey();
+            value.size = command.getMessageSize();
 
-            QueueRecord rc = queueIndex(tx).put(value.getQueueKey(), value);
+            QueueEntryRecord rc = queueIndex(tx).put(value.queueKey, value);
             if (rc == null) {
                 // TODO It seems a little inefficient to continually serialize
                 // the queue size. It might be better to update this only at
@@ -223,10 +221,10 @@ public class DestinationEntity {
                 // time (at the cost of startup time)
                 addStats(tx, 1, command.getMessageSize());
             } else {
-                throw new Store.FatalStoreException(new Store.DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + descriptor.getQueueName()));
+                throw new FatalStoreException(new DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + record.name));
             }
         } else {
-            throw new Store.DuplicateKeyException("Duplicate tracking " + command.getMessageKey() + " for " + descriptor.getQueueName());
+            throw new DuplicateKeyException("Duplicate tracking " + command.getMessageKey() + " for " + record.name);
         }
     }
 
@@ -250,25 +248,25 @@ public class DestinationEntity {
      * @throws IOException
      */
     public long remove(Transaction tx, long queueKey) throws IOException {
-        QueueRecord qr = queueIndex(tx).remove(queueKey);
+        QueueEntryRecord qr = queueIndex(tx).remove(queueKey);
         if(qr != null)
         {
-            trackingIndex(tx).remove(qr.getMessageKey());
-            addStats(tx, -1, -qr.getSize());
-            return qr.getMessageKey();
+            trackingIndex(tx).remove(qr.messageKey);
+            addStats(tx, -1, -qr.size);
+            return qr.messageKey;
         }
         return -1;
     }
 
-    public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, Long maxQueueKey, final int max) throws IOException {
-        Collection<QueueRecord> rc;
+    public Iterator<QueueEntryRecord> listMessages(Transaction tx, Long firstQueueKey, Long maxQueueKey, final int max) throws IOException {
+        Collection<QueueEntryRecord> rc;
         if (unlimited(max)) {
-            rc = new LinkedList<QueueRecord>();
+            rc = new LinkedList<QueueEntryRecord>();
         } else {
-            rc = new ArrayList<QueueRecord>(max);
+            rc = new ArrayList<QueueEntryRecord>(max);
         }
         
-        Iterator<Entry<Long, QueueRecord>> iterator;
+        Iterator<Entry<Long, QueueEntryRecord>> iterator;
         if (unlimited(firstQueueKey)) {
             iterator = queueIndex(tx).iterator();
 
@@ -281,8 +279,8 @@ public class DestinationEntity {
             if (countLimited && rc.size() >= max) {
                 break;
             }
-            Map.Entry<Long, QueueRecord> entry = iterator.next();
-            if (sequenceLimited && entry.getValue().getQueueKey() > maxQueueKey) {
+            Map.Entry<Long, QueueEntryRecord> entry = iterator.next();
+            if (sequenceLimited && entry.getValue().queueKey > maxQueueKey) {
                 break;
             }
             rc.add(entry.getValue());

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java Wed Jul  7 04:03:34 2010
@@ -0,0 +1,27 @@
+package org.apache.activemq.broker.store.hawtdb.store;
+
+/**
+* Created by IntelliJ IDEA.
+* User: chirino
+* Date: May 19, 2010
+* Time: 4:49:31 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class DuplicateKeyException extends Exception {
+    private static final long serialVersionUID = -477567614452245482L;
+
+    public DuplicateKeyException() {
+    }
+
+    public DuplicateKeyException(String message) {
+        super(message);
+    }
+
+    public DuplicateKeyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DuplicateKeyException(Throwable cause) {
+        super(cause);
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java Wed Jul  7 04:03:34 2010
@@ -0,0 +1,27 @@
+package org.apache.activemq.broker.store.hawtdb.store;
+
+/**
+* Created by IntelliJ IDEA.
+* User: chirino
+* Date: May 19, 2010
+* Time: 4:49:17 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class FatalStoreException extends RuntimeException {
+    private static final long serialVersionUID = 1122460895970375737L;
+
+    public FatalStoreException() {
+    }
+
+    public FatalStoreException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FatalStoreException(String message) {
+        super(message);
+    }
+
+    public FatalStoreException(Throwable cause) {
+        super(cause);
+    }
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java Wed Jul  7 04:03:34 2010
@@ -14,47 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.broker.store.hawtdb.store;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.activemq.broker.store.QueueDescriptor;
-import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.hawtdb.Data.MapAdd;
-import org.apache.activemq.broker.store.hawtdb.Data.MapEntryPut;
-import org.apache.activemq.broker.store.hawtdb.Data.MapEntryRemove;
-import org.apache.activemq.broker.store.hawtdb.Data.MapRemove;
-import org.apache.activemq.broker.store.hawtdb.Data.MessageAdd;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueAdd;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueAddMessage;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueRemove;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueRemoveMessage;
-import org.apache.activemq.broker.store.hawtdb.Data.SubscriptionAdd;
-import org.apache.activemq.broker.store.hawtdb.Data.SubscriptionRemove;
-import org.apache.activemq.broker.store.hawtdb.Data.Trace;
-import org.apache.activemq.broker.store.hawtdb.Data.Type;
-import org.apache.activemq.broker.store.hawtdb.Data.MapAdd.MapAddBean;
-import org.apache.activemq.broker.store.hawtdb.Data.MapEntryPut.MapEntryPutBean;
-import org.apache.activemq.broker.store.hawtdb.Data.MapEntryRemove.MapEntryRemoveBean;
-import org.apache.activemq.broker.store.hawtdb.Data.MapRemove.MapRemoveBean;
-import org.apache.activemq.broker.store.hawtdb.Data.MessageAdd.MessageAddBean;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueAdd.QueueAddBean;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueAddMessage.QueueAddMessageBean;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueRemove.QueueRemoveBean;
-import org.apache.activemq.broker.store.hawtdb.Data.QueueRemoveMessage.QueueRemoveMessageBean;
-import org.apache.activemq.broker.store.hawtdb.Data.SubscriptionAdd.SubscriptionAddBean;
-import org.apache.activemq.broker.store.hawtdb.Data.SubscriptionRemove.SubscriptionRemoveBean;
-import org.apache.activemq.broker.store.hawtdb.Data.Type.TypeCreatable;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.broker.store.hawtdb.store.Data.MapAdd;
+import org.apache.activemq.broker.store.hawtdb.store.Data.MapEntryPut;
+import org.apache.activemq.broker.store.hawtdb.store.Data.MapEntryRemove;
+import org.apache.activemq.broker.store.hawtdb.store.Data.MapRemove;
+import org.apache.activemq.broker.store.hawtdb.store.Data.MessageAdd;
+import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAdd;
+import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAddMessage;
+import org.apache.activemq.broker.store.hawtdb.store.Data.QueueRemove;
+import org.apache.activemq.broker.store.hawtdb.store.Data.QueueRemoveMessage;
+import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd;
+import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionRemove;
+import org.apache.activemq.broker.store.hawtdb.store.Data.Trace;
+import org.apache.activemq.broker.store.hawtdb.store.Data.Type;
+import org.apache.activemq.broker.store.hawtdb.store.Data.Type.TypeCreatable;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.DataByteArrayInputStream;
@@ -71,20 +56,22 @@ import org.fusesource.hawtdb.api.TxPageF
 import org.fusesource.hawtdb.internal.journal.Journal;
 import org.fusesource.hawtdb.internal.journal.Location;
 
-public class HawtDBStore implements Store {
+public class HawtDBManager {
 
-    private static final int BEGIN_UNIT_OF_WORK = -1;
-    private static final int END_UNIT_OF_WORK = -2;
-    private static final int FLUSH = -3;
-    private static final int CANCEL_UNIT_OF_WORK = -4;
-
-    private static final Log LOG = LogFactory.getLog(HawtDBStore.class);
-    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
-    private static final Buffer BEGIN_UNIT_OF_WORK_DATA = new Buffer(new byte[] { BEGIN_UNIT_OF_WORK });
-    private static final Buffer END_UNIT_OF_WORK_DATA = new Buffer(new byte[] { END_UNIT_OF_WORK });
-    private static final Buffer CANCEL_UNIT_OF_WORK_DATA = new Buffer(new byte[] { CANCEL_UNIT_OF_WORK });
-    private static final Buffer FLUSH_DATA = new Buffer(new byte[] { FLUSH });
+    // Message related methods.
+
+    static final int BEGIN_UNIT_OF_WORK = -1;
+    static final int END_UNIT_OF_WORK = -2;
+    static final int FLUSH = -3;
+    static final int CANCEL_UNIT_OF_WORK = -4;
+
+    static final Log LOG = LogFactory.getLog(HawtDBManager.class);
+    static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+    static final Buffer BEGIN_UNIT_OF_WORK_DATA = new Buffer(new byte[] { BEGIN_UNIT_OF_WORK });
+    static final Buffer END_UNIT_OF_WORK_DATA = new Buffer(new byte[] { END_UNIT_OF_WORK });
+    static final Buffer CANCEL_UNIT_OF_WORK_DATA = new Buffer(new byte[] { CANCEL_UNIT_OF_WORK });
+    static final Buffer FLUSH_DATA = new Buffer(new byte[] { FLUSH });
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
@@ -575,7 +562,7 @@ public class HawtDBStore implements Stor
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    private Location store(final TypeCreatable data, Runnable onFlush, Transaction tx) throws IOException {
+    Location store(final TypeCreatable data, Runnable onFlush, Transaction tx) throws IOException {
         final MessageBuffer message = ((PBMessage) data).freeze();
         int size = message.serializedSizeUnframed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -626,7 +613,7 @@ public class HawtDBStore implements Stor
      * @return
      * @throws IOException
      */
-    private TypeCreatable load(Location location) throws IOException {
+    TypeCreatable load(Location location) throws IOException {
         Buffer data = journal.read(location);
         return load(location, data);
     }
@@ -681,13 +668,13 @@ public class HawtDBStore implements Stor
             break;
         case MAP_ENTRY_PUT: {
             MapEntryPut p = (MapEntryPut) command;
-            rootEntity.mapAddEntry(p.getMapName(), p.getKey(), p.getValue(), tx);
+            rootEntity.mapAddEntry(p.getMapName(), p.getId(), p.getValue(), tx);
             break;
         }
         case MAP_ENTRY_REMOVE: {
             MapEntryRemove p = (MapEntryRemove) command;
             try {
-                rootEntity.mapRemoveEntry(p.getMapName(), p.getKey(), tx);
+                rootEntity.mapRemoveEntry(p.getMapName(), p.getId(), tx);
             } catch (KeyNotFoundException e) {
                 //yay, removed.
             }
@@ -707,28 +694,24 @@ public class HawtDBStore implements Stor
     }
 
     private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
-        QueueDescriptor qd = new QueueDescriptor();
-        qd.setQueueName(command.getQueueName());
-        qd.setApplicationType((short) command.getApplicationType());
-        qd.setQueueType((short) command.getQueueType());
-        if (command.hasParentName()) {
-            qd.setParent(command.getParentName());
-            qd.setPartitionId(command.getPartitionId());
-        }
+        QueueRecord qd = new QueueRecord();
+        qd.name = command.getName();
+        qd.queueType = command.getQueueType();
+//        if (command.hasParentName()) {
+//            qd.setParent(command.getParentName());
+//            qd.setPartitionId(command.getPartitionId());
+//        }
 
         rootEntity.queueAdd(tx, qd);
     }
 
     private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
-        QueueDescriptor qd = new QueueDescriptor();
-        qd.setQueueName(command.getQueueName());
-        rootEntity.queueRemove(tx, qd);
+        rootEntity.queueRemove(tx, command.getKey());
     }
 
     private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
-        QueueDescriptor qd = new QueueDescriptor();
-        qd.setQueueName(command.getQueueName());
-        DestinationEntity destination = rootEntity.getDestination(qd);
+        QueueRecord qd = new QueueRecord();
+        DestinationEntity destination = rootEntity.getDestination(command.getQueueKey());
         if (destination != null) {
             try {
                 destination.add(tx, command);
@@ -737,426 +720,22 @@ public class HawtDBStore implements Stor
                     throw new FatalStoreException(e);
                 }
             }
-            rootEntity.addMessageRef(tx, command.getQueueName(), command.getMessageKey());
+            rootEntity.addMessageRef(tx, command.getMessageKey());
         }
     }
 
     private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
-        QueueDescriptor qd = new QueueDescriptor();
-        qd.setQueueName(command.getQueueName());
-        DestinationEntity destination = rootEntity.getDestination(qd);
+        DestinationEntity destination = rootEntity.getDestination(command.getQueueKey());
         if (destination != null) {
             long messageKey = destination.remove(tx, command.getQueueKey());
             if (messageKey >= 0) {
-                rootEntity.removeMessageRef(tx, command.getQueueName(), command.getQueueKey());
-            }
-        }
-    }
-
-    class KahaDBSession implements Session {
-        TypeCreatable atomicUpdate = null;
-        int updateCount = 0;
-
-        private Transaction tx;
-
-        private Transaction tx() {
-            acquireLock();
-            return tx;
-        }
-
-        public final void commit() {
-            commit(null);
-        }
-
-        public final void rollback() {
-            try {
-                if (tx != null) {
-                    if (updateCount > 1) {
-                        journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
-                    }
-                    tx.rollback();
-                } else {
-                    throw new IllegalStateException("Not in Transaction");
-                }
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            } finally {
-                if (tx != null) {
-                    tx = null;
-                    updateCount = 0;
-                    atomicUpdate = null;
-                }
-            }
-        }
-
-        /**
-         * Indicates callers intent to start a transaction.
-         */
-        public final void acquireLock() {
-            if (tx == null) {
-                indexLock.writeLock().lock();
-                tx = pageFile.tx();
-            }
-        }
-
-        public final void releaseLock() {
-            try {
-                if (tx != null) {
-                    rollback();
-                }
-            } finally {
-                indexLock.writeLock().unlock();
+                rootEntity.removeMessageRef(tx, command.getQueueKey());
             }
         }
-
-        public void commit(Runnable onFlush) {
-            try {
-
-                boolean flush = false;
-                if (atomicUpdate != null) {
-                    store(atomicUpdate, onFlush, tx);
-                } else if (updateCount > 1) {
-                    journal.write(END_UNIT_OF_WORK_DATA, onFlush);
-                } else {
-                    flush = onFlush != null;
-                }
-
-                if (tx != null) {
-                    tx.commit();
-                }
-
-                if (flush) {
-                    onFlush.run();
-                }
-
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            } finally {
-                tx = null;
-                updateCount = 0;
-                atomicUpdate = null;
-            }
-        }
-
-        private void storeAtomic() {
-            if (atomicUpdate != null) {
-                try {
-                    journal.write(BEGIN_UNIT_OF_WORK_DATA, false);
-                    store(atomicUpdate, null, tx);
-                    atomicUpdate = null;
-                } catch (IOException ioe) {
-                    throw new FatalStoreException(ioe);
-                }
-            }
-        }
-
-        private void addUpdate(TypeCreatable bean) {
-            try {
-                //As soon as we do more than one update we'll wrap in a unit of 
-                //work:
-                if (updateCount == 0) {
-                    atomicUpdate = bean;
-                    updateCount++;
-                    return;
-                }
-                storeAtomic();
-
-                updateCount++;
-                store(bean, null, tx);
-
-            } catch (IOException ioe) {
-                throw new FatalStoreException(ioe);
-            }
-        }
-
-        // /////////////////////////////////////////////////////////////
-        // Message related methods.
-        // /////////////////////////////////////////////////////////////
-
-        public void messageAdd(MessageRecord message) {
-            if (message.getKey() < 0) {
-                throw new IllegalArgumentException("Key not set");
-            }
-            MessageAddBean bean = new MessageAddBean();
-            bean.setMessageKey(message.getKey());
-            bean.setMessageId(message.getMessageId());
-            bean.setEncoding(message.getEncoding());
-            bean.setMessageSize(message.getSize());
-            Buffer buffer = message.getBuffer();
-            if (buffer != null) {
-                bean.setBuffer(buffer);
-            }
-            Long streamKey = message.getStreamKey();
-            if (streamKey != null) {
-                bean.setStreamKey(streamKey);
-            }
-
-            addUpdate(bean);
-        }
-
-        public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
-            storeAtomic();
-            Location location = rootEntity.messageGetLocation(tx(), key);
-            if (location == null) {
-                throw new KeyNotFoundException("message key: " + key);
-            }
-            try {
-                MessageAdd bean = (MessageAdd) load(location);
-                MessageRecord rc = new MessageRecord();
-                rc.setKey(bean.getMessageKey());
-                rc.setMessageId(bean.getMessageId());
-                rc.setEncoding(bean.getEncoding());
-                rc.setSize(bean.getMessageSize());
-                if (bean.hasBuffer()) {
-                    rc.setBuffer(bean.getBuffer());
-                }
-                if (bean.hasStreamKey()) {
-                    rc.setStreamKey(bean.getStreamKey());
-                }
-                return rc;
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        // /////////////////////////////////////////////////////////////
-        // Queue related methods.
-        // /////////////////////////////////////////////////////////////
-        public void queueAdd(QueueDescriptor descriptor) {
-            QueueAddBean update = new QueueAddBean();
-            update.setQueueName(descriptor.getQueueName());
-            update.setQueueType(descriptor.getQueueType());
-            update.setApplicationType(descriptor.getApplicationType());
-            AsciiBuffer parent = descriptor.getParent();
-            if (parent != null) {
-                update.setParentName(parent);
-                update.setPartitionId(descriptor.getPartitionKey());
-            }
-            addUpdate(update);
-        }
-
-        public void queueRemove(QueueDescriptor descriptor) {
-            addUpdate(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
-        }
-
-        public Iterator<QueueQueryResult> queueListByType(short type, QueueDescriptor firstQueue, int max) {
-            storeAtomic();
-            try {
-                return rootEntity.queueList(tx(), type, firstQueue, max);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        public Iterator<QueueQueryResult> queueList(QueueDescriptor firstQueue, int max) {
-            storeAtomic();
-            try {
-                return rootEntity.queueList(tx(), (short) -1, firstQueue, max);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        public void queueAddMessage(QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
-            QueueAddMessageBean bean = new QueueAddMessageBean();
-            bean.setQueueName(queue.getQueueName());
-            bean.setQueueKey(record.getQueueKey());
-            bean.setMessageKey(record.getMessageKey());
-            bean.setMessageSize(record.getSize());
-            if (record.getAttachment() != null) {
-                bean.setAttachment(record.getAttachment());
-            }
-            addUpdate(bean);
-        }
-
-        public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException {
-            QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
-            bean.setQueueKey(queueKey);
-            bean.setQueueName(queue.getQueueName());
-            addUpdate(bean);
-        }
-
-        public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
-            storeAtomic();
-            DestinationEntity destination = rootEntity.getDestination(queue);
-            if (destination == null) {
-                throw new KeyNotFoundException("queue key: " + queue);
-            }
-            try {
-                return destination.listMessages(tx(), firstQueueKey, maxQueueKey, max);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        ////////////////////////////////////////////////////////////////
-        //Client related methods
-        ////////////////////////////////////////////////////////////////
-
-        /**
-         * Adds a subscription to the store.
-         * 
-         * @throws DuplicateKeyException
-         *             if a subscription with the same name already exists
-         * 
-         */
-        public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException {
-            storeAtomic();
-            SubscriptionRecord old;
-            try {
-                old = rootEntity.getSubscription(record.getName());
-                if (old != null && !old.equals(record)) {
-                    throw new DuplicateKeyException("Subscription already exists: " + record.getName());
-                } else {
-                    updateSubscription(record);
-                }
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        /**
-         * Updates a subscription in the store. If the subscription does not
-         * exist then it will simply be added.
-         */
-        public void updateSubscription(SubscriptionRecord record) {
-            SubscriptionAddBean update = new SubscriptionAddBean();
-            update.setName(record.getName());
-            update.setDestination(record.getDestination());
-            update.setDurable(record.getIsDurable());
-
-            if (record.getAttachment() != null) {
-                update.setAttachment(record.getAttachment());
-            }
-            if (record.getSelector() != null) {
-                update.setSelector(record.getSelector());
-            }
-            if (record.getTte() != -1) {
-                update.setTte(record.getTte());
-            }
-            addUpdate(update);
-        }
-
-        /**
-         * Removes a subscription with the given name from the store.
-         */
-        public void removeSubscription(AsciiBuffer name) {
-            SubscriptionRemoveBean update = new SubscriptionRemoveBean();
-            update.setName(name);
-            addUpdate(update);
-        }
-
-        /**
-         * @return A list of subscriptions
-         */
-        public Iterator<SubscriptionRecord> listSubscriptions() {
-            storeAtomic();
-            try {
-                return rootEntity.listSubsriptions(tx);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        // /////////////////////////////////////////////////////////////
-        // Map related methods.
-        // /////////////////////////////////////////////////////////////
-        public void mapAdd(AsciiBuffer map) {
-            MapAddBean update = new MapAddBean();
-            update.setMapName(map);
-            addUpdate(update);
-        }
-
-        public void mapRemove(AsciiBuffer map) {
-            MapRemoveBean update = new MapRemoveBean();
-            update.setMapName(map);
-            addUpdate(update);
-        }
-
-        public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
-            storeAtomic();
-            return rootEntity.mapList(first, max, tx);
-        }
-
-        public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) {
-            MapEntryPutBean update = new MapEntryPutBean();
-            update.setMapName(map);
-            update.setKey(key);
-            update.setValue(value);
-            addUpdate(update);
-        }
-
-        public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-            storeAtomic();
-            try {
-                return rootEntity.mapGetEntry(map, key, tx);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-            MapEntryRemoveBean update = new MapEntryRemoveBean();
-            update.setMapName(map);
-            update.setKey(key);
-            addUpdate(update);
-        }
-
-        public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
-            storeAtomic();
-            try {
-                return rootEntity.mapListKeys(map, first, max, tx);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
-        }
-
-        // /////////////////////////////////////////////////////////////
-        // Stream related methods.
-        // /////////////////////////////////////////////////////////////
-        public Long streamOpen() {
-            return null;
-        }
-
-        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
-        }
-
-        public void streamClose(Long streamKey) throws KeyNotFoundException {
-        }
-
-        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
-            return null;
-        }
-
-        public boolean streamRemove(Long streamKey) {
-            return false;
-        }
-
-        // /////////////////////////////////////////////////////////////
-        // Transaction related methods.
-        // /////////////////////////////////////////////////////////////
-        public void transactionAdd(Buffer txid) {
-        }
-
-        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
-        }
-
-        public void transactionCommit(Buffer txid) throws KeyNotFoundException {
-        }
-
-        public Iterator<Buffer> transactionList(Buffer first, int max) {
-            return null;
-        }
-
-        public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException {
-        }
-
-        public void transactionRollback(Buffer txid) throws KeyNotFoundException {
-        }
     }
 
-    public Session getSession() {
-        return new KahaDBSession();
+    public HawtDBSession getSession() {
+        return new HawtDBSession(this);
     }
 
     /**
@@ -1164,7 +743,7 @@ public class HawtDBStore implements Stor
      * transaction.
      */
     public <R, T extends Exception> R execute(final Callback<R, T> callback, final Runnable onFlush) throws T {
-        KahaDBSession session = new KahaDBSession();
+        HawtDBSession session = new HawtDBSession(this);
         session.acquireLock();
         try {
             R rc = callback.execute(session);



Mime
View raw message