activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961130 - in /activemq/sandbox/activemq-apollo-actor: activemq-hawtdb/ activemq-hawtdb/src/main/proto/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-store/src/test/scala/org/apache/activemq/apollo/store/
Date Wed, 07 Jul 2010 04:07:27 GMT
Author: chirino
Date: Wed Jul  7 04:07:26 2010
New Revision: 961130

URL: http://svn.apache.org/viewvc?rev=961130&view=rev
Log:
yay hawtdb store test is passing now

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml Wed Jul  7 04:07:26 2010
@@ -87,6 +87,14 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-util</artifactId>
+      <version>6.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul
 7 04:07:26 2010
@@ -85,7 +85,7 @@ message AddQueueEntry {
   required int64 queueKey=1;
   required int64 queueSeq=2;
   required int64 messageKey=3;
-  required int32 size=4;
+  optional int32 size=4;
   optional bytes attachment=5;
   optional int32 redeliveries = 6;
 }
@@ -152,29 +152,29 @@ message RemoveStream {
 
 
 ///////////////////////////////////////////////////////////////
-// Index Structures
+// Records Stored used in the Indexes
 ///////////////////////////////////////////////////////////////
 message DatabaseRootRecord {
 
-  required fixed32 state=1;
-  required fixed64 lastMessageKey=2;
-  required fixed64 firstInProgressBatch=3;
-  required fixed64 lastUpdateLocation=4;
-
-  required fixed32 dataFileRefIndexPage=5;
-  required fixed32 messageKeyIndexPage=6;
-  required fixed32 messageRefsIndexPage=7;
-  required fixed32 queueIndexPage=8;
-  required fixed32 subscriptionIndexPage=10;
-  required fixed32 mapIndexPage=11;
+  optional fixed64 firstBatchLocation=1;
+  optional fixed64 lastUpdateLocation=2;
+  optional fixed64 lastMessageKey=3;
+  optional fixed64 lastQueueKey=4;
+
+  optional fixed32 dataFileRefIndexPage=50;
+  optional fixed32 messageKeyIndexPage=51;
+  optional fixed32 messageRefsIndexPage=52;
+  optional fixed32 queueIndexPage=53;
+  optional fixed32 subscriptionIndexPage=54;
+  optional fixed32 mapIndexPage=55;
   
 }
 
 message QueueRootRecord {
-  required AddQueue info=1;
-  required int64 size=2;
-  required int64 count=3;
-  required fixed32 entryIndexPage=4;
-  required fixed32 trackingIndexPage=5;
+  optional AddQueue info=1;
+  optional int64 size=2;
+  optional int64 count=3;
+  optional fixed32 entryIndexPage=4;
+  optional fixed32 trackingIndexPage=5;
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
Wed Jul  7 04:07:26 2010
@@ -38,12 +38,12 @@ import org.apache.activemq.broker.store.
 import org.apache.activemq.broker.store.hawtdb.model._
 import org.fusesource.hawtbuf._
 import org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
 import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
 import collection.JavaConversions
 import java.util.{TreeSet, HashSet}
 
 import org.fusesource.hawtdb.api._
+import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
 
 object HawtDBClient extends Log {
   val BEGIN = -1
@@ -60,7 +60,7 @@ object HawtDBClient extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBClient(hawtDBStore: HawtDBStore) extends Logging {
+class HawtDBClient(hawtDBStore: HawtDBStore) extends DispatchLogging {
   import HawtDBClient._
   import Helpers._
 
@@ -144,7 +144,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     journal
   }
 
-  def start() = {
+  val schedual_version = new AtomicInteger()
+
+  def start(onComplete:Runnable) = {
     lock {
 
       journal = createJournal()
@@ -178,43 +180,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       }
 
       pageFile.flush()
-      recover
-
-      //        trackingGen.set(rootEntity.getLastMessageTracking() + 1)
-
-      //      checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-      //          public void run() {
-      //              try {
-      //                  long lastCleanup = System.currentTimeMillis()
-      //                  long lastCheckpoint = System.currentTimeMillis()
-      //
-      //                  // Sleep for a short time so we can periodically check
-      //                  // to see if we need to exit this thread.
-      //                  long sleepTime = Math.min(checkpointInterval, 500)
-      //                  while (opened.get()) {
-      //                      Thread.sleep(sleepTime)
-      //                      long now = System.currentTimeMillis()
-      //                      if (now - lastCleanup >= cleanupInterval) {
-      //                          checkpointCleanup(true)
-      //                          lastCleanup = now
-      //                          lastCheckpoint = now
-      //                      } else if (now - lastCheckpoint >= checkpointInterval) {
-      //                          checkpointCleanup(false)
-      //                          lastCheckpoint = now
-      //                      }
-      //                  }
-      //              } catch (InterruptedException e) {
-      //                  // Looks like someone really wants us to exit this
-      //                  // thread...
-      //              }
-      //          }
-      //      }
-      //      checkpointThread.start()
+      recover(onComplete)
 
+      // Schedual periodic jobs.. they keep executing while schedual_version remains the
same.
+      schedualCleanup(schedual_version.get())
+      schedualFlush(schedual_version.get())
     }
   }
 
   def stop() = {
+    // this cancels schedualed jobs...
+    schedual_version.incrementAndGet
+    flush
   }
 
   def addQueue(record: QueueRecord) = {
@@ -238,8 +215,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       tx =>
         tx.actions.foreach {
           case (msg, action) =>
-            if (action.store != null) {
-              val update: AddMessage.Bean = action.store
+            if (action.messageRecord != null) {
+              val update: AddMessage.Bean = action.messageRecord
               batch ::= update
             }
             action.enqueues.foreach {
@@ -352,8 +329,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T]
= {
     try {
-      read(location) match {
-        case (updateType, data) =>
+      load(location) match {
+        case (updateType, batch, data) =>
           Some(expected.cast(decode(location, updateType, data)))
       }
     } catch {
@@ -402,7 +379,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     baos.writeInt(batch)
     frozen.writeUnframed(baos)
 
-    append(baos.toBuffer()) {
+    val buffer = baos.toBuffer()
+    append(buffer) {
       location =>
         executeStore(batch, update, onComplete, location)
     }
@@ -442,6 +420,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
+  def load(location: Location) = {
+    var data = read(location)
+    val editor = data.bigEndianEditor
+    val updateType = editor.readByte()
+    val batch = editor.readInt
+    (updateType, batch, data)
+  }
+
   /////////////////////////////////////////////////////////////////////
   //
   // Methods related to recovery
@@ -449,13 +435,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   /////////////////////////////////////////////////////////////////////
 
   /**
-   * Move all the messages that were in the journal into the indexes.
+   * Recovers the journal and rollsback any in progress batches that
+   * were in progress and never committed.
    *
    * @throws IOException
    * @throws IOException
    * @throws IllegalStateException
    */
-  def recover: Unit = {
+  def recover(onComplete:Runnable): Unit = {
     recoveryCounter = 0
     lastRecoveryPosition = null
     val start = System.currentTimeMillis()
@@ -467,10 +454,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         batch =>
           rollback(batch, null)
       }
-    })
 
-    val end = System.currentTimeMillis()
-    info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end
- start) / 1000.0f))
+      val end = System.currentTimeMillis()
+      info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter,
((end - start) / 1000.0f))
+      onComplete.run
+    })
   }
 
 
@@ -482,14 +470,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
     // Is this our first incremental recovery pass?
     if (lastRecoveryPosition == null) {
-      if (databaseRootRecord.hasFirstInProgressBatch) {
+      if (databaseRootRecord.hasFirstBatchLocation) {
         // we have to start at the first in progress batch usually...
-        nextRecoveryPosition = databaseRootRecord.getFirstInProgressBatch
+        nextRecoveryPosition = databaseRootRecord.getFirstBatchLocation
       } else {
         // but perhaps there were no batches in progress..
         if (databaseRootRecord.hasLastUpdateLocation) {
           // then we can just continue from the last update applied to the index
-          nextRecoveryPosition = journal.getNextLocation(databaseRootRecord.getLastUpdateLocation)
+          lastRecoveryPosition = databaseRootRecord.getLastUpdateLocation
+          nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
         } else {
           // no updates in the index?.. start from the first record in the journal.
           nextRecoveryPosition = journal.getNextLocation(null)
@@ -533,7 +522,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
 
     recoveryCounter += 1
-    databaseRootRecord.setLastUpdateLocation(location)
   }
 
 
@@ -554,17 +542,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     } finally {
       val end = System.currentTimeMillis()
       if (end - start > 1000) {
-        warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
+        warn("Journal append latencey: %,.3f seconds", ((end - start) / 1000.0f))
       }
     }
   }
 
-  def read(location: Location) = {
-    var data = journal.read(location)
-    val editor = data.bigEndianEditor
-    val updateType = editor.readByte()
-    (updateType, data)
-  }
+  def read(location: Location) = journal.read(location)
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -597,7 +580,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       case None =>
         // when recovering..  we are more lax due recovery starting
         // in the middle of a stream of in progress batches
-        assert(!recovering)
+        assert(recovering)
     }
     if (onComplete != null) {
       onComplete.run
@@ -616,7 +599,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       case None =>
         // when recovering..  we are more lax due recovery starting
         // in the middle of a stream of in progress batches
-        assert(!recovering)
+        assert(recovering)
     }
     if (onComplete != null) {
       onComplete.run
@@ -652,7 +635,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         case None =>
           // when recovering..  we are more lax due recovery starting
           // in the middle of a stream of in progress batches
-          assert(!recovering)
+          assert(recovering)
       }
     }
   }
@@ -665,20 +648,20 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
       def apply(x: AddMessage.Getter): Unit = {
 
-        val key = x.getMessageKey()
-        if (key > databaseRootRecord.getLastMessageKey) {
-          databaseRootRecord.setLastMessageKey(key)
+        val messageKey = x.getMessageKey()
+        if (messageKey > databaseRootRecord.getLastMessageKey) {
+          databaseRootRecord.setLastMessageKey(messageKey)
         }
 
-        val prevLocation = messageKeyIndex.put(key, location)
+        val prevLocation = messageKeyIndex.put(messageKey, location)
         if (prevLocation != null) {
           // Message existed.. undo the index update we just did. Chances
           // are it's a transaction replay.
-          messageKeyIndex.put(key, prevLocation)
+          messageKeyIndex.put(messageKey, prevLocation)
           if (location == prevLocation) {
-            warn("Message replay detected for: %d", key)
+            warn("Message replay detected for: %d", messageKey)
           } else {
-            error("Message replay with different location for: %d", key)
+            error("Message replay with different location for: %d", messageKey)
           }
         } else {
           val fileId:jl.Integer = location.getDataFileId()
@@ -697,12 +680,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       }
 
       def apply(x: AddQueue.Getter): Unit = {
-        if (queueIndex.get(x.getKey) == null) {
+        val queueKey = x.getKey
+        if (queueIndex.get(queueKey) == null) {
+
+          if (queueKey > databaseRootRecord.getLastQueueKey) {
+            databaseRootRecord.setLastQueueKey(queueKey)
+          }
+
           val queueRecord = new QueueRootRecord.Bean
           queueRecord.setEntryIndexPage(alloc(QUEUE_ENTRY_INDEX_FACTORY))
           queueRecord.setTrackingIndexPage(alloc(QUEUE_TRACKING_INDEX_FACTORY))
           queueRecord.setInfo(x)
-          queueIndex.put(x.getKey, queueRecord.freeze)
+          queueIndex.put(queueKey, queueRecord.freeze)
         }
       }
 
@@ -814,15 +803,22 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
 
     update match {
-      case x: AddMessage.Getter => Process(x)
-      case x: AddQueueEntry.Getter => Process(x)
-      case x: RemoveQueueEntry.Getter => Process(x)
-
-      case x: AddQueue.Getter => Process(x)
-      case x: RemoveQueue.Getter => Process(x)
-
-      case x: AddTrace.Getter => Process(x)
-      case x: Purge.Getter => Process(x)
+      case x: AddMessage.Getter =>
+        Process(x)
+      case x: AddQueueEntry.Getter =>
+        Process(x)
+      case x: RemoveQueueEntry.Getter =>
+        Process(x)
+
+      case x: AddQueue.Getter =>
+        Process(x)
+      case x: RemoveQueue.Getter =>
+        Process(x)
+
+      case x: AddTrace.Getter =>
+        Process(x)
+      case x: Purge.Getter =>
+        Process(x)
 
       case x: AddSubscription.Getter =>
       case x: RemoveSubscription.Getter =>
@@ -846,12 +842,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   //
   /////////////////////////////////////////////////////////////////////
 
-  def schedualFlush(): Unit = {
+  def schedualFlush(version:Int): Unit = {
     def try_flush() = {
-      if (hawtDBStore.serviceState.isStarted) {
+      if (version == schedual_version.get) {
         hawtDBStore.executor_pool {
           flush
-          schedualFlush
+          schedualFlush(version)
         }
       }
     }
@@ -863,18 +859,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     pageFile.flush
     val end = System.currentTimeMillis()
     if (end - start > 1000) {
-      warn("Index flush took %,.3f seconds" + ((end - start) / 1000.0f))
+      warn("Index flush latency: %,.3f seconds", ((end - start) / 1000.0f))
     }
   }
 
-  def schedualCleanup(): Unit = {
+  def schedualCleanup(version:Int): Unit = {
     def try_cleanup() = {
-      if (hawtDBStore.serviceState.isStarted) {
+      if (version == schedual_version.get) {
         hawtDBStore.executor_pool {
           withTx {tx =>
             cleanup(tx)
           }
-          schedualCleanup
+          schedualCleanup(version)
         }
       }
     }
@@ -899,8 +895,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
 
     // Don't GC files that we will need for recovery..
-    val upto = if (databaseRootRecord.hasFirstInProgressBatch) {
-      Some(databaseRootRecord.getFirstInProgressBatch.getDataFileId)
+    val upto = if (databaseRootRecord.hasFirstBatchLocation) {
+      Some(databaseRootRecord.getFirstBatchLocation.getDataFileId)
     } else {
       if (databaseRootRecord.hasLastUpdateLocation) {
         Some(databaseRootRecord.getLastUpdateLocation.getDataFileId)
@@ -945,7 +941,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   private case class Update(update: TypeCreatable, location: Location)
 
   private class TxHelper(private val _tx: Transaction) {
-    lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
+    lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getQueueIndexPage)
     lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
     lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage)
     lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
@@ -1014,12 +1010,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     lastUpdate.compareTo(location) < 0
   }
 
-  private def updateLocations(tx: Transaction, location: Location): Unit = {
-    databaseRootRecord.setLastUpdateLocation(location)
+  private def updateLocations(tx: Transaction, lastUpdate: Location): Unit = {
+    databaseRootRecord.setLastUpdateLocation(lastUpdate)
     if (batches.isEmpty) {
-      databaseRootRecord.clearFirstInProgressBatch
+      databaseRootRecord.clearFirstBatchLocation
     } else {
-      databaseRootRecord.setFirstInProgressBatch(batches.head._2._1)
+      databaseRootRecord.setFirstBatchLocation(batches.head._2._1)
     }
     tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
     databaseRootRecord = databaseRootRecord.copy

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
Wed Jul  7 04:07:26 2010
@@ -24,13 +24,12 @@ import java.util.HashMap
 import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
 import org.apache.activemq.apollo.util.IntCounter
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
-import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
 import collection.{JavaConversions, Seq}
-import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
 import org.fusesource.hawtdispatch.ScalaDispatch._
-import ReporterLevel._
+import org.apache.activemq.apollo.broker._
 import java.io.File
+import ReporterLevel._
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -59,7 +58,7 @@ object HawtDBStore extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBStore extends Store with BaseService with Logging {
+class HawtDBStore extends Store with BaseService with DispatchLogging {
 
   import HawtDBStore._
   override protected def log = HawtDBStore
@@ -95,8 +94,11 @@ class HawtDBStore extends Store with Bas
     executor_pool = Executors.newFixedThreadPool(20)
     client.config = config
     executor_pool {
-      client.start
-      onCompleted.run
+      client.start(^{
+        next_msg_key.set( client.databaseRootRecord.getLastMessageKey.longValue +1 )
+        next_queue_key.set( client.databaseRootRecord.getLastQueueKey.longValue +1 )
+        onCompleted.run
+      })
     }
   }
 
@@ -198,12 +200,12 @@ class HawtDBStore extends Store with Bas
     class MessageAction {
 
       var msg= 0L
-      var store: MessageRecord = null
+      var messageRecord: MessageRecord = null
       var enqueues = ListBuffer[QueueEntryRecord]()
       var dequeues = ListBuffer[QueueEntryRecord]()
 
       def tx = HawtDBBatch.this
-      def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
+      def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
       def cancel() = {
         tx.rm(msg)
         if( tx.isEmpty ) {
@@ -229,7 +231,7 @@ class HawtDBStore extends Store with Bas
       record.key = next_msg_key.incrementAndGet
       val action = new MessageAction
       action.msg = record.key
-      action.store = record
+      action.messageRecord = record
       this.synchronized {
         actions += record.key -> action
       }
@@ -294,7 +296,7 @@ class HawtDBStore extends Store with Bas
       }
 
       tx.actions.foreach { case (msg, action) =>
-        if( action.store!=null ) {
+        if( action.messageRecord!=null ) {
           pendingStores.put(msg, action)
         }
         action.enqueues.foreach { queueEntry=>
@@ -312,9 +314,9 @@ class HawtDBStore extends Store with Bas
             prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey
)
 
             // if the message is not in any queues.. we can gc it..
-            if( prevAction.enqueues == Nil && prevAction.store !=null ) {
+            if( prevAction.enqueues == Nil && prevAction.messageRecord !=null ) {
               pendingStores.remove(msg)
-              prevAction.store = null
+              prevAction.messageRecord = null
             }
 
             // Cancel the action if it's now empty
@@ -355,7 +357,7 @@ class HawtDBStore extends Store with Bas
       if (tx!=null) {
 
         tx.actions.foreach { case (msg, action) =>
-          if( action.store!=null ) {
+          if( action.messageRecord !=null ) {
             pendingStores.remove(msg)
           }
           action.enqueues.foreach { queueEntry=>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
Wed Jul  7 04:07:26 2010
@@ -105,6 +105,7 @@ object Helpers {
 
   implicit def toMessageRecord(pb: AddMessage.Getter): MessageRecord = {
     val rc = new MessageRecord
+    rc.key = pb.getMessageKey
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
     rc.value = pb.getValue
@@ -115,6 +116,7 @@ object Helpers {
 
   implicit def fromMessageRecord(v: MessageRecord): AddMessage.Bean = {
     val pb = new AddMessage.Bean
+    pb.setMessageKey(v.key)
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
     pb.setValue(v.value)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
Wed Jul  7 04:07:26 2010
@@ -128,6 +128,16 @@ abstract class StoreFunSuiteSupport exte
     msgKeys
   }
 
+  test("load stored message") {
+    val A = addQueue("A")
+    val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
+    expect(ascii("message 1").buffer) {
+      rc.get.value
+    }
+  }
+
   test("add and list queues") {
     val A = addQueue("A")
     val B = addQueue("B")
@@ -161,16 +171,6 @@ abstract class StoreFunSuiteSupport exte
     }
   }
 
-  test("load stored message") {
-    val A = addQueue("A")
-    val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
-    val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
-    expect(ascii("message 1").buffer) {
-      rc.get.value
-    }
-  }
-
   test("batch completes after a delay") {x}
   def x = {
     val A = addQueue("A")
@@ -189,7 +189,7 @@ abstract class StoreFunSuiteSupport exte
     }
   }
 
-  test("flush cancels the completion delay") {
+  test("flush cancels the delay") {
     val A = addQueue("A")
     var batch = store.createStoreBatch
 



Mime
View raw message