activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1159776 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala...
Date Fri, 19 Aug 2011 20:36:43 GMT
Author: chirino
Date: Fri Aug 19 20:36:42 2011
New Revision: 1159776

URL: http://svn.apache.org/viewvc?rev=1159776&view=rev
Log:
Fixing leaky abstraction: Changed signature of the locator field from AtomicLong to AtomicReference[Array[Byte]]
so that stores can be as creative as they want when constructing locators.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Aug 19 20:36:42 2011
@@ -165,7 +165,7 @@ class BDBStore(var config:BDBStoreDTO) e
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicLong)(callback: (Option[MessageRecord])
=> Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord])
=> Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Aug 19 20:36:42
2011
@@ -49,5 +49,5 @@ message QueueEntryPB {
   optional bytes attachment=5;
   optional int32 redeliveries = 6;
   optional sint64 expiration=7;
-  optional int64 messageLocator=8;
+  optional bytes messageLocator=8;
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Aug 19 20:36:42 2011
@@ -22,7 +22,7 @@ import protocol.Protocol
 import org.apache.activemq.apollo.filter.Filterable
 import org.apache.activemq.apollo.broker.store.StoreUOW
 import org.apache.activemq.apollo.util.Log
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 
 object DeliveryProducer extends Log
 
@@ -175,7 +175,7 @@ class Delivery {
    * After the store persists the message he may be able to supply us with  locator handle
    * which will load the message faster than looking it up via the store key.
    */
-  var storeLocator:AtomicLong = null
+  var storeLocator:AtomicReference[Array[Byte]] = null
 
   /**
    * The transaction the delivery is participating in.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug 19 20:36:42 2011
@@ -28,7 +28,8 @@ import org.fusesource.hawtdispatch.{List
 import OptionSupport._
 import security.SecurityContext
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
+import org.fusesource.hawtbuf.Buffer
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -829,7 +830,8 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    state = new Swapped(qer.message_key, new AtomicLong(qer.message_locator), qer.size, qer.expiration)
+    val locator = new AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
+    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration)
     this
   }
 
@@ -888,7 +890,7 @@ class QueueEntry(val queue:Queue, val se
     qer.queue_key = queue.store_id
     qer.entry_seq = seq
     qer.message_key = state.message_key
-    qer.message_locator = Option(state.message_locator).map(_.get).getOrElse(0)
+    qer.message_locator = Option(state.message_locator).flatMap(x=> Option(x.get)).map(new
Buffer(_)).getOrElse(null)
     qer.size = state.size
     qer.expiration = expiration
     qer
@@ -979,7 +981,7 @@ class QueueEntry(val queue:Queue, val se
      */
     def message_key = -1L
 
-    def message_locator: AtomicLong = null
+    def message_locator: AtomicReference[Array[Byte]] = null
 
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
@@ -1148,7 +1150,7 @@ class QueueEntry(val queue:Queue, val se
 
               delivery.uow = queue.virtual_host.store.create_uow
               val uow = delivery.uow
-              delivery.storeLocator = new AtomicLong()
+              delivery.storeLocator = new AtomicReference[Array[Byte]]()
               delivery.storeKey = uow.store(delivery.createMessageRecord )
               store
               if( asap ) {
@@ -1330,7 +1332,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicLong, override
val size:Int, override val expiration:Long) extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Array[Byte]],
override val size:Int, override val expiration:Long) extends EntryState {
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Fri Aug 19 20:36:42 2011
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.dto._
 import security.SecurityContext
 import store.StoreUOW
 import util.continuations._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -162,7 +162,7 @@ abstract class DeliveryProducerRoute(rou
             } else {
               copy.uow.retain
             }
-            copy.storeLocator = new AtomicLong()
+            copy.storeLocator = new AtomicReference[Array[Byte]]()
             copy.storeKey = copy.uow.store(copy.createMessageRecord)
           }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Fri Aug 19 20:36:42 2011
@@ -19,7 +19,7 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.fusesource.hawtbuf.Buffer
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -32,6 +32,6 @@ class MessageRecord {
   var buffer: Buffer = _
   var zero_copy_buffer: ZeroCopyBuffer = _
   var expiration = 0L
-  var locator:AtomicLong = _
+  var locator:AtomicReference[Array[Byte]] = _
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
Fri Aug 19 20:36:42 2011
@@ -29,7 +29,7 @@ class QueueEntryRecord {
   var queue_key = 0L
   var entry_seq = 0L
   var message_key = 0L
-  var message_locator = 0L
+  var message_locator:Buffer = _
   var attachment:Buffer = _
   var size = 0
   var expiration = 0L

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Aug 19 20:36:42 2011
@@ -20,7 +20,7 @@ import org.apache.activemq.apollo.dto.St
 import org.apache.activemq.apollo.util._
 import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 
 trait StreamManager[A] {
   def using_queue_stream(func: (A)=>Unit)
@@ -109,7 +109,7 @@ trait Store extends ServiceTrait {
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def load_message(messageKey:Long, locator:AtomicLong)(callback:(Option[MessageRecord])=>Unit
)
+  def load_message(messageKey:Long, locator:AtomicReference[Array[Byte]])(callback:(Option[MessageRecord])=>Unit
)
 
   /**
    * Exports the contents of the store to the provided streams.  Each stream should contain

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
Fri Aug 19 20:36:42 2011
@@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfterEach
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter}
 import java.util.concurrent.atomic._
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -99,7 +100,7 @@ abstract class StoreBenchmarkSupport ext
     message.protocol = ascii("test-protocol")
     message.buffer = ascii(content).buffer
     message.size = message.buffer.length
-    message.locator = new AtomicLong()
+    message.locator = new AtomicReference[Array[Byte]]()
     batch.store(message)
   }
 
@@ -191,7 +192,7 @@ abstract class StoreBenchmarkSupport ext
     var keys = message_keys.toList
     val metric = benchmarkCount(keys.size) {
       val latch = new CountDownLatch(1)
-      store.load_message(keys.head, new AtomicLong(0)) { msg=>
+      store.load_message(keys.head, null) { msg=>
         assert(msg.isDefined, "message key not found: "+keys.head)
         latch.countDown
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Fri Aug 19 20:36:42 2011
@@ -139,7 +139,7 @@ abstract class StoreFunSuiteSupport exte
     val A = add_queue("A")
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head, new AtomicLong())(cb)
)
+    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head, null)(cb)
)
     expect(ascii("message 1").buffer) {
       rc.get.buffer
     }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Aug 19 20:36:42 2011
@@ -178,7 +178,7 @@ class HawtDBStore(var config:HawtDBStore
     }
   }
 
-  def load_message(messageKey: Long, locator:AtomicLong)(callback: (Option[MessageRecord])
=> Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord])
=> Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Aug 19 20:36:42 2011
@@ -153,7 +153,7 @@ class JDBM2Store(var config:JDBM2StoreDT
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicLong)(callback: (Option[MessageRecord])
=> Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord])
=> Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()



Mime
View raw message