activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1052005 [1/2] - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/resources/META-INF/ser...
Date Wed, 22 Dec 2010 17:37:52 GMT
Author: chirino
Date: Wed Dec 22 17:37:50 2010
New Revision: 1052005

URL: http://svn.apache.org/viewvc?rev=1052005&view=rev
Log:
Change name to use underscores instead of camel case.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala
      - copied, changed from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html
      - copied, changed from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala
      - copied, changed from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java
      - copied, changed from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreBenchmark.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
    activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Dec 22 17:37:50 2010
@@ -37,7 +37,7 @@ class BDBClient(store: BDBStore) extends
 
   override def log: Log = BDBClient
 
-  def dispatchQueue = store.dispatchQueue
+  def dispatchQueue = store.dispatch_queue
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -80,8 +80,8 @@ class BDBClient(store: BDBStore) extends
 
   case class TxContext(tx:Transaction) {
 
-    def with_entries_db[T](queueKey:Long)(func: (Database) => T): T = {
-      val db = environment.openDatabase(tx, entries_db_name(queueKey), long_key_conf)
+    def with_entries_db[T](queue_key:Long)(func: (Database) => T): T = {
+      val db = environment.openDatabase(tx, entries_db_name(queue_key), long_key_conf)
       try {
         func(db)
       } finally {
@@ -163,8 +163,8 @@ class BDBClient(store: BDBStore) extends
         }
       }
 
-      listQueues.foreach{ queueKey=>
-        val name = entries_db_name(queueKey)
+      listQueues.foreach{ queue_key=>
+        val name = entries_db_name(queue_key)
         remove_db(name)
       }
       remove_db("messages")
@@ -208,24 +208,24 @@ class BDBClient(store: BDBStore) extends
     callback.run
   }
 
-  def removeQueue(queueKey: Long, callback:Runnable) = {
+  def removeQueue(queue_key: Long, callback:Runnable) = {
     with_ctx { ctx=>
       import ctx._
 
-      queues_db.delete(tx, queueKey)
-      with_entries_db(queueKey) { entries_db=>
+      queues_db.delete(tx, queue_key)
+      with_entries_db(queue_key) { entries_db=>
 
         entries_db.cursor(tx) { (key,value)=>
           val queueEntry:QueueEntryRecord = value
-          if( add_and_get(message_refs_db, queueEntry.messageKey, -1, tx)==0 ) {
-            messages_db.delete(tx, queueEntry.messageKey)
+          if( add_and_get(message_refs_db, queueEntry.message_key, -1, tx)==0 ) {
+            messages_db.delete(tx, queueEntry.message_key)
           }
           true // keep cursoring..
         }
 
       }
 
-      environment.removeDatabase(tx, entries_db_name(queueKey))
+      environment.removeDatabase(tx, entries_db_name(queue_key))
     }
     callback.run
   }
@@ -243,17 +243,17 @@ class BDBClient(store: BDBStore) extends
               }
 
               action.enqueues.foreach { queueEntry =>
-                with_entries_db(queueEntry.queueKey) { entries_db=>
-                  entries_db.put(tx, queueEntry.queueSeq, queueEntry)
-                  add_and_get(message_refs_db, queueEntry.messageKey, 1, tx)
+                with_entries_db(queueEntry.queue_key) { entries_db=>
+                  entries_db.put(tx, queueEntry.entry_seq, queueEntry)
+                  add_and_get(message_refs_db, queueEntry.message_key, 1, tx)
                 }
               }
 
               action.dequeues.foreach { queueEntry =>
-                with_entries_db(queueEntry.queueKey) { entries_db=>
-                  entries_db.delete(tx, queueEntry.queueSeq)
-                  if( add_and_get(message_refs_db, queueEntry.messageKey, -1, tx)==0 ) {
-                    messages_db.delete(tx, queueEntry.messageKey)
+                with_entries_db(queueEntry.queue_key) { entries_db=>
+                  entries_db.delete(tx, queueEntry.entry_seq)
+                  if( add_and_get(message_refs_db, queueEntry.message_key, -1, tx)==0 ) {
+                    messages_db.delete(tx, queueEntry.message_key)
                   }
                 }
               }
@@ -277,19 +277,19 @@ class BDBClient(store: BDBStore) extends
     rc
   }
 
-  def getQueue(queueKey: Long): Option[QueueRecord] = {
+  def getQueue(queue_key: Long): Option[QueueRecord] = {
     with_ctx { ctx=>
       import ctx._
-      queues_db.get(tx, to_DatabaseEntry(queueKey)).map( x=> to_QueueRecord(x)  )
+      queues_db.get(tx, to_DatabaseEntry(queue_key)).map( x=> to_QueueRecord(x)  )
     }
   }
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryRange] = {
+  def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = {
     var rc = ListBuffer[QueueEntryRange]()
     with_ctx { ctx=>
       import ctx._
 
-      with_entries_db(queueKey) { entries_db=>
+      with_entries_db(queue_key) { entries_db=>
 
         var group:QueueEntryRange = null
 
@@ -297,12 +297,12 @@ class BDBClient(store: BDBStore) extends
 
           if( group == null ) {
             group = new QueueEntryRange
-            group.firstQueueSeq = key
+            group.first_entry_seq = key
           }
 
           val entry:QueueEntryRecord = value
 
-          group.lastQueueSeq = key
+          group.last_entry_seq = key
           group.count += 1
           group.size += entry.size
 
@@ -323,17 +323,17 @@ class BDBClient(store: BDBStore) extends
     rc
   }
 
-  def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+  def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
     var rc = ListBuffer[QueueEntryRecord]()
     with_ctx { ctx=>
       import ctx._
 
-      with_entries_db(queueKey) { entries_db=>
+      with_entries_db(queue_key) { entries_db=>
         entries_db.cursor_from(tx, to_DatabaseEntry(firstSeq)) { (key, value) =>
-          val queueSeq:Long = key
+          val entry_seq:Long = key
           val entry:QueueEntryRecord = value
           rc += entry
-          queueSeq < lastSeq
+          entry_seq < lastSeq
         }
       }
     }
@@ -347,13 +347,13 @@ class BDBClient(store: BDBStore) extends
     val records = with_ctx { ctx=>
       import ctx._
 
-      requests.flatMap { case (messageKey, callback)=>
+      requests.flatMap { case (message_key, callback)=>
         val record = metric_load_from_index_counter.time {
-          messages_db.get(tx, to_DatabaseEntry(messageKey)).map ( to_MessageRecord _ )
+          messages_db.get(tx, to_DatabaseEntry(message_key)).map ( to_MessageRecord _ )
         }
         record match {
           case None =>
-          debug("Message not indexed: %s", messageKey)
+          debug("Message not indexed: %s", message_key)
           callback(None)
           None
           case Some(x) => Some((record, callback))

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Wed Dec 22 17:37:50 2010
@@ -82,7 +82,7 @@ class BDBStore extends DelayingStoreSupp
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     write_executor {
       client.store(uows, ^{
-        dispatchQueue {
+        dispatch_queue {
           callback
         }
       })
@@ -93,7 +93,7 @@ class BDBStore extends DelayingStoreSupp
 
   def configure(config: BDBStoreDTO, reporter: Reporter) = {
     if ( BDBStore.validate(config, reporter) < ERROR ) {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         // TODO: apply changes while he broker is running.
         reporter.report(WARN, "Updating bdb store configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
       } else {
@@ -102,7 +102,7 @@ class BDBStore extends DelayingStoreSupp
     }
   }
 
-  protected def _start(onCompleted: Runnable) = {
+  protected def _start(on_completed: Runnable) = {
     info("Starting bdb store at: '%s'", config.directory)
     write_executor = Executors.newFixedThreadPool(1, new ThreadFactory(){
       def newThread(r: Runnable) = {
@@ -124,11 +124,11 @@ class BDBStore extends DelayingStoreSupp
       client.start()
       next_msg_key.set( client.getLastMessageKey +1 )
       next_queue_key.set( client.getLastQueueKey +1 )
-      onCompleted.run
+      on_completed.run
     }
   }
 
-  protected def _stop(onCompleted: Runnable) = {
+  protected def _stop(on_completed: Runnable) = {
     new Thread() {
       override def run = {
         info("Stopping BDB store at: '%s'", config.directory)
@@ -139,7 +139,7 @@ class BDBStore extends DelayingStoreSupp
         read_executor.awaitTermination(60, TimeUnit.SECONDS)
         read_executor = null
         client.stop
-        onCompleted.run
+        on_completed.run
       }
     }.start
   }
@@ -166,42 +166,42 @@ class BDBStore extends DelayingStoreSupp
   /**
    * Ges the last queue key identifier stored.
    */
-  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+  def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
     write_executor {
       callback(Some(client.getLastQueueKey))
     }
   }
 
-  def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+  def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
     write_executor {
      client.addQueue(record, ^{ callback(true) })
     }
   }
 
-  def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
+  def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
     write_executor {
       client.removeQueue(queueKey,^{ callback(true) })
     }
   }
 
-  def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
+  def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
     write_executor {
       callback( client.getQueue(queueKey) )
     }
   }
 
-  def listQueues(callback: (Seq[Long]) => Unit) = {
+  def list_queues(callback: (Seq[Long]) => Unit) = {
     write_executor {
       callback( client.listQueues )
     }
   }
 
-  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
+  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatch_queue)
   load_source.setEventHandler(^{drain_loads});
   load_source.resume
 
 
-  def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()
@@ -218,13 +218,13 @@ class BDBStore extends DelayingStoreSupp
     }
   }
 
-  def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
+  def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
     write_executor ^{
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }
   }
 
-  def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+  def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
     write_executor ^{
       callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
     }
@@ -232,7 +232,7 @@ class BDBStore extends DelayingStoreSupp
 
   def poll_stats:Unit = {
     def displayStats = {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
 
         flush_latency = flush_latency_counter(true)
         message_load_latency = message_load_latency_counter(true)
@@ -245,14 +245,14 @@ class BDBStore extends DelayingStoreSupp
       }
     }
 
-    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+    dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
   }
 
-  def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+  def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
     val rc = new BDBStoreStatusDTO
 
-    rc.state = serviceState.toString
-    rc.state_since = serviceState.since
+    rc.state = service_state.toString
+    rc.state_since = service_state.since
 
     rc.flush_latency = flush_latency
     rc.message_load_latency = message_load_latency

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala Wed Dec 22 17:37:50 2010
@@ -49,9 +49,9 @@ object HelperTrait {
   implicit def to_QueueEntryRecord(entry: DatabaseEntry): QueueEntryRecord = {
     val pb =  QueueEntryPB.FACTORY.parseUnframed(entry.getData)
     val rc = new QueueEntryRecord
-    rc.queueKey = pb.getQueueKey
-    rc.queueSeq = pb.getQueueSeq
-    rc.messageKey = pb.getMessageKey
+    rc.queue_key = pb.getQueueKey
+    rc.entry_seq = pb.getQueueSeq
+    rc.message_key = pb.getMessageKey
     rc.attachment = pb.getAttachment
     rc.size = pb.getSize
     rc.redeliveries = pb.getRedeliveries.toShort
@@ -60,9 +60,9 @@ object HelperTrait {
 
   implicit def to_DatabaseEntry(v: QueueEntryRecord): DatabaseEntry = {
     val pb = new QueueEntryPB.Bean
-    pb.setQueueKey(v.queueKey)
-    pb.setQueueSeq(v.queueSeq)
-    pb.setMessageKey(v.messageKey)
+    pb.setQueueKey(v.queue_key)
+    pb.setQueueSeq(v.entry_seq)
+    pb.setMessageKey(v.message_key)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)
     pb.setRedeliveries(v.redeliveries)
@@ -171,6 +171,6 @@ object HelperTrait {
   implicit def DatabaseWrapper(x: Database) = new RichDatabase(x)
 
 
-  def entries_db_name(queueKey: Long): String =  "entries-" + queueKey
+  def entries_db_name(queue_key: Long): String =  "entries-" + queue_key
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreBenchmark.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreBenchmark.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreBenchmark.scala Wed Dec 22 17:37:50 2010
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.broker
  */
 class BDBStoreBenchmark extends StoreBenchmarkSupport {
 
-  def createStore(flushDelay:Long):Store = {
+  def create_store(flushDelay:Long):Store = {
     val rc = new BDBStore
     rc.config.flush_delay = flushDelay
     rc

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
  */
 class BDBStoreTest extends StoreFunSuiteSupport {
 
-  def createStore(flushDelay:Long):Store = {
+  def create_store(flushDelay:Long):Store = {
     val rc = new BDBStore
     rc.config.flush_delay = flushDelay
     rc

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index Wed Dec 22 17:37:50 2010
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.transport.vm.VMTransportFactory
+org.apache.activemq.apollo.broker.transport.VMTransportFactory

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Dec 22 17:37:50 2010
@@ -120,7 +120,7 @@ object Broker extends Log {
     val rc = new BrokerDTO
     rc.id = "default"
     rc.notes = "A default configuration"
-    rc.virtual_hosts.add(VirtualHost.defaultConfig)
+    rc.virtual_hosts.add(VirtualHost.default_config)
     rc.connectors.add(Connector.defaultConfig)
     rc
   }
@@ -156,10 +156,9 @@ object Broker extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Broker() extends BaseService with DispatchLogging with LoggingReporter {
+class Broker() extends BaseService {
   
   import Broker._
-  override protected def log = Broker
 
   var config: BrokerDTO = defaultConfig
 
@@ -169,9 +168,9 @@ class Broker() extends BaseService with 
 
   var connectors: List[Connector] = Nil
 
-  val dispatchQueue = createQueue("broker") // getGlobalQueue(DispatchPriority.HIGH).createQueue("broker")
+  val dispatch_queue = createQueue("broker") // getGlobalQueue(DispatchPriority.HIGH).createQueue("broker")
   if( STICK_ON_THREAD_QUEUES ) {
-    dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+    dispatch_queue.setTargetQueue(Dispatch.getRandomThreadQueue)
   }
 
   val id = broker_id_counter.incrementAndGet
@@ -188,11 +187,11 @@ class Broker() extends BaseService with 
   /**
    * Validates and then applies the configuration.
    */
-  def configure(config: BrokerDTO, reporter:Reporter) = dispatchQueue {
+  def configure(config: BrokerDTO, reporter:Reporter) = dispatch_queue {
     if ( validate(config, reporter) < ERROR ) {
       this.config = config
 
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         // TODO: apply changes while he broker is running.
         reporter.report(WARN, "Updating broker configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
 
@@ -203,7 +202,7 @@ class Broker() extends BaseService with 
   var authenticator:Authenticator = _
   var authorizer:Authorizer = _
 
-  override def _start(onCompleted:Runnable) = {
+  override def _start(on_completed:Runnable) = {
 
     // create the runtime objects from the config
     {
@@ -221,7 +220,7 @@ class Broker() extends BaseService with 
       default_virtual_host = null
       for (c <- config.virtual_hosts) {
         val host = new VirtualHost(this, virtual_host_id_counter.incrementAndGet)
-        host.configure(c, this)
+        host.configure(c, LoggingReporter(VirtualHost))
         virtual_hosts += ascii(c.id)-> host
         // first defined host is the default virtual host
         if( default_virtual_host == null ) {
@@ -235,31 +234,31 @@ class Broker() extends BaseService with 
       }
       for (c <- config.connectors) {
         val connector = new Connector(this, connector_id_counter.incrementAndGet)
-        connector.configure(c, this)
+        connector.configure(c, LoggingReporter(VirtualHost))
         connectors ::= connector
       }
     }
 
     // Start up the virtual hosts
-    val tracker = new LoggingTracker("broker startup", dispatchQueue)
+    val tracker = new LoggingTracker("broker startup", dispatch_queue)
     virtual_hosts.valuesIterator.foreach( x=>
       tracker.start(x)
     )
 
     // Once virtual hosts are up.. start up the connectors.
     tracker.callback(^{
-      val tracker = new LoggingTracker("broker startup", dispatchQueue)
+      val tracker = new LoggingTracker("broker startup", dispatch_queue)
       connectors.foreach( x=>
         tracker.start(x)
       )
-      tracker.callback(onCompleted)
+      tracker.callback(on_completed)
     })
 
   }
 
 
-  def _stop(onCompleted:Runnable): Unit = {
-    val tracker = new LoggingTracker("broker shutdown", dispatchQueue)
+  def _stop(on_completed:Runnable): Unit = {
+    val tracker = new LoggingTracker("broker shutdown", dispatch_queue)
     // Stop accepting connections..
     connectors.foreach( x=>
       tracker.stop(x)
@@ -268,14 +267,14 @@ class Broker() extends BaseService with 
     virtual_hosts.valuesIterator.foreach( x=>
       tracker.stop(x)
     )
-    tracker.callback(onCompleted)
+    tracker.callback(on_completed)
   }
 
-  def getVirtualHost(name: AsciiBuffer) = dispatchQueue ! {
+  def getVirtualHost(name: AsciiBuffer) = dispatch_queue ! {
     virtual_hosts_by_hostname.getOrElse(name, null)
   }
 
-  def getDefaultVirtualHost = dispatchQueue ! {
+  def getDefaultVirtualHost = dispatch_queue ! {
     default_virtual_host
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala Wed Dec 22 17:37:50 2010
@@ -75,7 +75,7 @@ object FileConfigStore extends Log
 class FileConfigStore extends ConfigStore {
   import FileConfigStore._
 
-  case class StoredBrokerModel(data:Array[Byte], lastModified:Long)
+  case class StoredBrokerModel(data:Array[Byte], last_modified:Long)
 
   var file:File = new File("activemq.xml")
 
@@ -84,15 +84,15 @@ class FileConfigStore extends ConfigStor
   @volatile
   var running = false
 
-  val dispatchQueue = createQueue("config store")
+  val dispatch_queue = createQueue("config store")
 
   // can't do blocking IO work on the dispatchQueue :(
   // so... use an executor
-  var ioWorker:ExecutorService = null
+  var io_worker:ExecutorService = null
 
 
   def start = {
-    ioWorker = Executors.newSingleThreadExecutor
+    io_worker = Executors.newSingleThreadExecutor
     running = true
 
     file = file.getCanonicalFile;
@@ -110,12 +110,12 @@ class FileConfigStore extends ConfigStor
     }
 
     latest = read(file)
-    schedualNextUpdateCheck
+    schedual_next_update_check
   }
 
   def stop = {
     running = false
-    ioWorker.shutdown
+    io_worker.shutdown
   }
 
   def load(eval:Boolean) = {
@@ -139,21 +139,21 @@ class FileConfigStore extends ConfigStor
     latest = write(m)
   }
 
-  private def schedualNextUpdateCheck:Unit = dispatchQueue.after(1, TimeUnit.SECONDS) {
+  private def schedual_next_update_check:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
     if( running ) {
-      val lastModified = latest.lastModified
+      val last_modified = latest.last_modified
       val latestData = latest.data
-      ioWorker {
+      io_worker {
         try {
           val l = file.lastModified
-          if( l != lastModified ) {
+          if( l != last_modified ) {
             val config = read(file)
             if ( !Arrays.equals(latestData, config.data) ) {
               // TODO: trigger reloading the config file.
             }
             latest = config
           }
-          schedualNextUpdateCheck
+          schedual_next_update_check
         }
         catch {
           case e:Exception =>
@@ -182,7 +182,7 @@ class FileConfigStore extends ConfigStor
     }
 
     IOHelper.writeBinaryFile(file, config.data)
-    config.copy(lastModified = file.lastModified)
+    config.copy(last_modified = file.lastModified)
   }
 
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Dec 22 17:37:50 2010
@@ -20,60 +20,72 @@ import _root_.java.io.{IOException}
 import _root_.java.lang.{String}
 import org.fusesource.hawtdispatch._
 import protocol.{ProtocolHandler}
-import org.apache.activemq.apollo.transport.{DefaultTransportListener, Transport}
 import org.apache.activemq.apollo.util.{Log, BaseService}
 import org.apache.activemq.apollo.filter.BooleanExpression
 import org.apache.activemq.apollo.dto.ConnectionStatusDTO
+import org.apache.activemq.apollo.transport.{TransportListener, DefaultTransportListener, Transport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object Connection extends Log {
-}
-
+object Connection extends Log
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class Connection() extends DefaultTransportListener with BaseService  {
-
-  override protected def log = Connection
+abstract class Connection() extends BaseService  {
+  import Connection._
 
-  val dispatchQueue = createQueue()
+  val dispatch_queue = createQueue()
   var stopped = true
   var transport:Transport = null
-  var transportSink:TransportSink = null 
+  var transport_sink:TransportSink = null
 
-  override protected def _start(onCompleted:Runnable) = {
+  override protected def _start(on_completed:Runnable) = {
     stopped = false
-    transportSink = new TransportSink(transport)
-    transport.setDispatchQueue(dispatchQueue);
-    transport.setTransportListener(Connection.this);
-    transport.start(onCompleted)
+    transport_sink = new TransportSink(transport)
+    transport.setDispatchQueue(dispatch_queue);
+    transport.setTransportListener(new TransportListener(){
+      def onTransportFailure(error: IOException) = Connection.this.on_transport_failure(error)
+      def onTransportDisconnected = Connection.this.on_transport_disconnected
+      def onTransportConnected =  Connection.this.on_transport_connected
+      def onTransportCommand(command: AnyRef) =  Connection.this.on_transport_command(command)
+      def onRefill =  Connection.this.on_refill
+    });
+    transport.start(on_completed)
   }
 
-  override protected def _stop(onCompleted:Runnable) = {
+  override protected def _stop(on_completed:Runnable) = {
     stopped = true
-    transport.stop(onCompleted)
+    transport.stop(on_completed)
   }
 
-
-  override def onTransportFailure(error:IOException) = {
+  protected def on_transport_failure(error:IOException) = {
     if (!stopped) {
-        onFailure(error);
+        on_failure(error);
     }
   }
 
-  def onFailure(error:Exception) = {
-    warn(error)
-    transport.stop
-  }
 
-  override def onRefill = {
-    if( transportSink.refiller !=null ) {
-      transportSink.refiller.run
+  protected def on_refill = {
+    if( transport_sink.refiller !=null ) {
+      transport_sink.refiller.run
     }
   }
 
+  protected def on_transport_command(command: Object) = {
+  }
+
+  protected def on_transport_connected() = {
+  }
+
+  protected def on_transport_disconnected() = {
+  }
+
+
+  protected def on_failure(error:Exception) = {
+    warn(error)
+    transport.stop
+  }
 }
 
 /**
@@ -81,53 +93,48 @@ abstract class Connection() extends Defa
  */
 class BrokerConnection(val connector: Connector, val id:Long) extends Connection {
 
-  var protocolHandler: ProtocolHandler = null;
+  var protocol_handler: ProtocolHandler = null;
 
   override def toString = "id: "+id.toString
 
-  override protected  def _start(onCompleted:Runnable) = {
-    connector.dispatchQueue.retain
-    protocolHandler.setConnection(this);
-    super._start(onCompleted)
+  protected override  def _start(on_completed:Runnable) = {
+    connector.dispatch_queue.retain
+    protocol_handler.set_connection(this);
+    super._start(on_completed)
   }
 
-  override protected def _stop(onCompleted:Runnable) = {
+  protected override def _stop(on_completed:Runnable) = {
     connector.stopped(this)
-    connector.dispatchQueue.release
-    super._stop(onCompleted)
+    connector.dispatch_queue.release
+    super._stop(on_completed)
   }
 
-  override def onTransportConnected() = protocolHandler.onTransportConnected
+  protected override def on_transport_connected() = protocol_handler.on_transport_connected
 
-  override def onTransportDisconnected() = protocolHandler.onTransportDisconnected
+  protected override def on_transport_disconnected() = protocol_handler.on_transport_disconnected
 
-  override def onTransportCommand(command: Object) = {
+  protected override def on_transport_command(command: Object) = {
     try {
-      protocolHandler.onTransportCommand(command);
+      protocol_handler.on_transport_command(command);
     } catch {
       case e:Exception =>
-        onFailure(e)
+        on_failure(e)
     }
   }
 
-  override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
-
-  override def onRefill = {
-    super.onRefill
-    protocolHandler.onRefill
-  }
+  protected override def on_transport_failure(error: IOException) = protocol_handler.on_transport_failure(error)
 
   def get_connection_status = {
-    val result = if( protocolHandler==null ) {
+    val result = if( protocol_handler==null ) {
       new ConnectionStatusDTO
     } else {
-      protocolHandler.create_connection_status
+      protocol_handler.create_connection_status
     }
 
     result.id = id
-    result.state = serviceState.toString
-    result.state_since = serviceState.since
-    result.protocol = protocolHandler.protocol
+    result.state = service_state.toString
+    result.state_since = service_state.since
+    result.protocol = protocol_handler.protocol
     result.transport = transport.getTypeId
     result.remote_address = transport.getRemoteAddress
     val wf = transport.getProtocolCodec

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Dec 22 17:37:50 2010
@@ -73,7 +73,7 @@ object Connector extends Log {
 class Connector(val broker:Broker, val id:Long) extends BaseService {
   import Connector._
 
-  override val dispatchQueue = broker.dispatchQueue
+  override val dispatch_queue = broker.dispatch_queue
 
   var config:ConnectorDTO = defaultConfig
   var transportServer:TransportServer = _
@@ -85,7 +85,7 @@ class Connector(val broker:Broker, val i
 
   object BrokerAcceptListener extends TransportAcceptListener {
     def onAcceptError(e: Exception): Unit = {
-      log.warn(e, "Error occured while accepting client connection.")
+      warn(e, "Error occured while accepting client connection.")
     }
 
     def onAccept(transport: Transport): Unit = {
@@ -95,16 +95,16 @@ class Connector(val broker:Broker, val i
 
       accept_counter.incrementAndGet
       var connection = new BrokerConnection(Connector.this, broker.connection_id_counter.incrementAndGet)
-      connection.dispatchQueue.setLabel("connection %d to %s".format(connection.id, transport.getRemoteAddress))
-      connection.protocolHandler = protocol.createProtocolHandler
+      connection.dispatch_queue.setLabel("connection %d to %s".format(connection.id, transport.getRemoteAddress))
+      connection.protocol_handler = protocol.createProtocolHandler
       connection.transport = transport
 
       if( STICK_ON_THREAD_QUEUES ) {
-        connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+        connection.dispatch_queue.setTargetQueue(Dispatch.getRandomThreadQueue)
       }
 
       // We release when it gets removed form the connections list.
-      connection.dispatchQueue.retain
+      connection.dispatch_queue.retain
       connections.put(connection.id, connection)
       info("Client connected from: %s", connection.transport.getRemoteAddress)
 
@@ -134,20 +134,20 @@ class Connector(val broker:Broker, val i
     if ( validate(config, reporter) < ERROR ) {
       this.config = config
 
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         // TODO: apply changes while running
         reporter.report(WARN, "Updating connector configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
 
       }
     }
-  } |>>: dispatchQueue
+  } |>>: dispatch_queue
 
 
-  override def _start(onCompleted:Runnable) = {
+  override def _start(on_completed:Runnable) = {
     assert(config!=null, "Connector must be configured before it is started.")
     protocol = ProtocolFactory.get(config.protocol.getOrElse("multi")).get
     transportServer = TransportFactory.bind( config.bind )
-    transportServer.setDispatchQueue(dispatchQueue)
+    transportServer.setDispatchQueue(dispatch_queue)
     transportServer.setAcceptListener(BrokerAcceptListener)
 
     if( transportServer.isInstanceOf[KeyAndTrustAware] ) {
@@ -160,19 +160,19 @@ class Connector(val broker:Broker, val i
     }
     transportServer.start(^{
       info("Accepting connections at: "+config.bind)
-      onCompleted.run
+      on_completed.run
     })
   }
 
 
-  override def _stop(onCompleted:Runnable): Unit = {
+  override def _stop(on_completed:Runnable): Unit = {
     transportServer.stop(^{
       info("Stopped connector at: "+config.bind)
-      val tracker = new LoggingTracker(toString, dispatchQueue)
+      val tracker = new LoggingTracker(toString, dispatch_queue)
       connections.valuesIterator.foreach { connection=>
         tracker.stop(connection)
       }
-      tracker.callback(onCompleted)
+      tracker.callback(on_completed)
     })
   }
 
@@ -180,11 +180,11 @@ class Connector(val broker:Broker, val i
    * Connections callback into the connector when they are stopped so that we can
    * stop tracking them.
    */
-  def stopped(connection:BrokerConnection) = dispatchQueue {
+  def stopped(connection:BrokerConnection) = dispatch_queue {
     val at_limit = at_connection_limit
     if( connections.remove(connection.id).isDefined ) {
       info("Client disconnected from: %s", connection.transport.getRemoteAddress)
-      connection.dispatchQueue.release
+      connection.dispatch_queue.release
       if( at_limit ) {
         transportServer.resume
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Dec 22 17:37:50 2010
@@ -34,14 +34,14 @@ object DeliveryProducer extends Log
 trait DeliveryProducer extends Logging {
   override protected def log:Log = DeliveryProducer
 
-  def dispatchQueue:DispatchQueue
+  def dispatch_queue:DispatchQueue
 
   def connection:Option[BrokerConnection] = None
 
   def collocate(value:DispatchQueue):Unit = {
-    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
-      debug("co-locating %s with %s", dispatchQueue.getLabel, value.getLabel);
-      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
+    if( value.getTargetQueue ne dispatch_queue.getTargetQueue ) {
+      debug("co-locating %s with %s", dispatch_queue.getLabel, value.getLabel);
+      this.dispatch_queue.setTargetQueue(value.getTargetQueue)
     }
   }
 
@@ -58,7 +58,7 @@ trait DeliveryConsumer extends Retained 
 
   def browser = false
   def exclusive = false
-  def dispatchQueue:DispatchQueue;
+  def dispatch_queue:DispatchQueue;
   def matches(message:Delivery):Boolean
   def connect(producer:DeliveryProducer):DeliverySession
   def is_persistent:Boolean

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec 22 17:37:50 2010
@@ -55,23 +55,23 @@ class Queue(val host: VirtualHost, var i
 
   val filter = binding.message_filter
 
-  override val dispatchQueue: DispatchQueue = createQueue(binding.label);
-  dispatchQueue.setTargetQueue(getRandomThreadQueue)
-  dispatchQueue {
+  override val dispatch_queue: DispatchQueue = createQueue(binding.label);
+  dispatch_queue.setTargetQueue(getRandomThreadQueue)
+  dispatch_queue {
     debug("created queue for: " + binding.label)
   }
   setDisposer(^ {
     ack_source.release
-    dispatchQueue.release
+    dispatch_queue.release
     session_manager.release
   })
 
 
-  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, Boolean, StoreUOW)](), dispatchQueue)
+  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, Boolean, StoreUOW)](), dispatch_queue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
+  val session_manager = new SinkMux[Delivery](messages, dispatch_queue, Delivery)
 
   // sequence numbers.. used to track what's in the store.
   var message_seq_counter = 1L
@@ -149,17 +149,17 @@ class Queue(val host: VirtualHost, var i
   var capacity = 0
   var capacity_used = 0
 
-  val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+  val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
   swap_source.setEventHandler(^{ swap_messages });
   swap_source.resume
 
-  protected def _start(onCompleted: Runnable) = {
+  protected def _start(on_completed: Runnable) = {
 
     capacity = tune_queue_buffer;
 
     def completed: Unit = {
       // by the time this is run, consumers and producers may have already joined.
-      onCompleted.run
+      on_completed.run
       schedual_consumer_sample
       // wake up the producers to fill us up...
       if (messages.refiller != null) {
@@ -168,7 +168,7 @@ class Queue(val host: VirtualHost, var i
 
       // kick off dispatching to the consumers.
       trigger_swap
-      dispatchQueue << head_entry
+      dispatch_queue << head_entry
     }
 
     if( tune_persistent ) {
@@ -181,23 +181,23 @@ class Queue(val host: VirtualHost, var i
         record.binding_data = binding.binding_data
         record.binding_kind = binding.binding_kind
 
-        host.store.addQueue(record) { rc =>
-          dispatchQueue {
+        host.store.add_queue(record) { rc =>
+          dispatch_queue {
             completed
           }
         }
 
       } else {
 
-        host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
-          dispatchQueue {
+        host.store.list_queue_entry_ranges(id, tune_flush_range_size) { ranges=>
+          dispatch_queue {
             if( ranges!=null && !ranges.isEmpty ) {
 
               ranges.foreach { range =>
-                val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
+                val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
                 entries.addLast(entry)
 
-                message_seq_counter = range.lastQueueSeq + 1
+                message_seq_counter = range.last_entry_seq + 1
                 enqueue_item_counter += range.count
                 enqueue_size_counter += range.size
                 tail_entry = new QueueEntry(Queue.this, next_message_seq)
@@ -219,9 +219,9 @@ class Queue(val host: VirtualHost, var i
     }
   }
 
-  protected def _stop(onCompleted: Runnable) = {
+  protected def _stop(on_completed: Runnable) = {
     // TODO: perhaps we should remove all the entries
-    onCompleted.run
+    on_completed.run
   }
 
   def addCapacity(amount:Int) = {
@@ -236,7 +236,7 @@ class Queue(val host: VirtualHost, var i
 
     var refiller: Runnable = null
 
-    def full = (capacity_used >= capacity) || !serviceState.isStarted
+    def full = (capacity_used >= capacity) || !service_state.is_started
 
     def offer(delivery: Delivery): Boolean = {
       if (full) {
@@ -397,7 +397,7 @@ class Queue(val host: VirtualHost, var i
   def schedual_consumer_sample:Unit = {
 
     def slowConsumerCheck = {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
 
         // target tune_min_subscription_rate / sec
         all_subscriptions.foreach{ case (consumer, sub)=>
@@ -424,7 +424,7 @@ class Queue(val host: VirtualHost, var i
       }
     }
 
-    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
+    dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{
       slowConsumerCheck
     })
   }
@@ -463,9 +463,9 @@ class Queue(val host: VirtualHost, var i
 
     override def producer = p
 
-    val session = session_manager.open(producer.dispatchQueue)
+    val session = session_manager.open(producer.dispatch_queue)
 
-    dispatchQueue {
+    dispatch_queue {
       inbound_sessions += this
       addCapacity( tune_producer_buffer )
     }
@@ -473,7 +473,7 @@ class Queue(val host: VirtualHost, var i
 
     def close = {
       session_manager.close(session)
-      dispatchQueue {
+      dispatch_queue {
         addCapacity( -tune_producer_buffer )
         inbound_sessions -= this
       }
@@ -532,9 +532,9 @@ class Queue(val host: VirtualHost, var i
       val sub = new Subscription(this, consumer)
       sub.open
     }
-  } >>: dispatchQueue
+  } >>: dispatch_queue
 
-  def unbind(values: List[DeliveryConsumer]) = dispatchQueue {
+  def unbind(values: List[DeliveryConsumer]) = dispatch_queue {
     for (consumer <- values) {
       all_subscriptions.get(consumer) match {
         case Some(subscription) =>
@@ -558,7 +558,7 @@ class Queue(val host: VirtualHost, var i
     rc
   }
 
-  val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatchQueue)
+  val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatch_queue)
   store_flush_source.setEventHandler(^ {drain_store_flushes});
   store_flush_source.resume
 
@@ -571,7 +571,7 @@ class Queue(val host: VirtualHost, var i
 
   }
 
-  val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Flushed, MessageRecord)](), dispatchQueue)
+  val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Flushed, MessageRecord)](), dispatch_queue)
   store_load_source.setEventHandler(^ {drain_store_loads});
   store_load_source.resume
 
@@ -590,9 +590,9 @@ class Queue(val host: VirtualHost, var i
   }
 
   def collocate(value:DispatchQueue):Unit = {
-    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
-      debug("co-locating %s with %s", dispatchQueue.getLabel, value.getLabel);
-      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
+    if( value.getTargetQueue ne dispatch_queue.getTargetQueue ) {
+      debug("co-locating %s with %s", dispatch_queue.getLabel, value.getLabel);
+      this.dispatch_queue.setTargetQueue(value.getTargetQueue)
     }
   }
 }
@@ -635,12 +635,12 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    state = new Flushed(qer.messageKey, qer.size)
+    state = new Flushed(qer.message_key, qer.size)
     this
   }
 
   def init(range:QueueEntryRange):QueueEntry = {
-    state = new FlushedRange(range.lastQueueSeq, range.count, range.size)
+    state = new FlushedRange(range.last_entry_seq, range.count, range.size)
     this
   }
 
@@ -685,9 +685,9 @@ class QueueEntry(val queue:Queue, val se
 
   def toQueueEntryRecord = {
     val qer = new QueueEntryRecord
-    qer.queueKey = queue.id
-    qer.queueSeq = seq
-    qer.messageKey = state.messageKey
+    qer.queue_key = queue.id
+    qer.entry_seq = seq
+    qer.message_key = state.message_key
     qer.size = state.size
     qer
   }
@@ -722,7 +722,7 @@ class QueueEntry(val queue:Queue, val se
   // These should not change the current state.
   def count = state.count
   def size = state.size
-  def messageKey = state.messageKey
+  def messageKey = state.message_key
   def is_flushed_or_flushing = state.is_flushed_or_flushing
   def dispatch() = state.dispatch
 
@@ -769,7 +769,7 @@ class QueueEntry(val queue:Queue, val se
      * Gets the message key for the entry.
      * @returns -1 if it is not known.
      */
-    def messageKey = -1L
+    def message_key = -1L
 
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
@@ -893,7 +893,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def count = 1
     override def size = delivery.size
-    override def messageKey = delivery.storeKey
+    override def message_key = delivery.storeKey
 
     override def is_flushed_or_flushing = {
       flushing
@@ -903,7 +903,7 @@ class QueueEntry(val queue:Queue, val se
 
     def store = {
       delivery.uow.enqueue(toQueueEntryRecord)
-      delivery.uow.onComplete(^{
+      delivery.uow.on_complete(^{
         queue.store_flush_source.merge(this)
       })
     }
@@ -922,19 +922,19 @@ class QueueEntry(val queue:Queue, val se
             // The storeBatch is only set when called from the messages.offer method
             if( delivery.uow!=null ) {
               if( asap ) {
-                delivery.uow.completeASAP
+                delivery.uow.complete_asap
               }
             } else {
 
               // Are swapping out a non-persistent message?
               if( delivery.storeKey == -1 ) {
                 
-                delivery.uow = queue.host.store.createStoreUOW
+                delivery.uow = queue.host.store.create_uow
                 val uow = delivery.uow
                 delivery.storeKey = uow.store(delivery.createMessageRecord)
                 store
                 if( asap ) {
-                  uow.completeASAP
+                  uow.complete_asap
                 }
                 uow.release
                 delivery.uow = null
@@ -942,7 +942,7 @@ class QueueEntry(val queue:Queue, val se
               } else {
                   
                 if( asap ) {
-                  queue.host.store.flushMessage(messageKey) {
+                  queue.host.store.flush_message(message_key) {
                     queue.store_flush_source.merge(this)
                   }
                 }
@@ -1089,7 +1089,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Flushed(override val messageKey:Long, override val size:Int) extends EntryState {
+  class Flushed(override val message_key:Long, override val size:Int) extends EntryState {
 
     queue.flushed_items += 1
 
@@ -1117,7 +1117,7 @@ class QueueEntry(val queue:Queue, val se
         // start loading it back...
         loading = true
         queue.loading_size += size
-        queue.host.store.loadMessage(messageKey) { delivery =>
+        queue.host.store.load_message(message_key) { delivery =>
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
@@ -1128,7 +1128,7 @@ class QueueEntry(val queue:Queue, val se
 
             // Looks like someone else removed the message from the store.. lets just
             // tombstone this entry now.
-            queue.dispatchQueue {
+            queue.dispatch_queue {
               remove
             }
           }
@@ -1214,16 +1214,16 @@ class QueueEntry(val queue:Queue, val se
     override def load() = {
       if( !loading ) {
         loading = true
-        queue.host.store.listQueueEntries(queue.id, seq, last) { records =>
+        queue.host.store.list_queue_entries(queue.id, seq, last) { records =>
           if( !records.isEmpty ) {
-            queue.dispatchQueue {
+            queue.dispatch_queue {
 
               var item_count=0
               var size_count=0
 
               val tmpList = new LinkedNodeList[QueueEntry]()
               records.foreach { record =>
-                val entry = new QueueEntry(queue, record.queueSeq).init(record)
+                val entry = new QueueEntry(queue, record.entry_seq).init(record)
                 tmpList.addLast(entry)
                 item_count += 1
                 size_count += record.size
@@ -1294,7 +1294,7 @@ class QueueEntry(val queue:Queue, val se
 class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with DispatchLogging {
   override protected def log = Queue
 
-  def dispatchQueue = queue.dispatchQueue
+  def dispatch_queue = queue.dispatch_queue
 
   val id = Queue.subcsription_counter.incrementAndGet
   var acquired = new LinkedNodeList[AcquiredQueueEntry]
@@ -1342,10 +1342,10 @@ class Subscription(val queue:Queue, val 
       queue.exclusive_subscriptions.append(this)
     }
 
-    if( queue.serviceState.isStarted ) {
+    if( queue.service_state.is_started ) {
       // kick off the initial dispatch.
       refill_prefetch
-      queue.dispatchQueue << queue.head_entry
+      queue.dispatch_queue << queue.head_entry
     }
   }
 
@@ -1412,7 +1412,7 @@ class Subscription(val queue:Queue, val 
     value ::= this
     pos = value
     session.refiller = value
-    queue.dispatchQueue << value // queue up the entry to get dispatched..
+    queue.dispatch_queue << value // queue up the entry to get dispatched..
   }
 
   def tail_parked = pos eq queue.tail_entry
@@ -1471,7 +1471,7 @@ class Subscription(val queue:Queue, val 
       total_ack_count += 1
       if (entry.messageKey != -1) {
         val storeBatch = if( sb == null ) {
-          queue.host.store.createStoreUOW
+          queue.host.store.create_uow
         } else {
           sb
         }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Dec 22 17:37:50 2010
@@ -67,7 +67,7 @@ class Router(val host:VirtualHost) exten
 
   val destination_id_counter = new LongCounter
 
-  protected def dispatchQueue:DispatchQueue = host.dispatchQueue
+  protected def dispatchQueue:DispatchQueue = host.dispatch_queue
 
   var queue_bindings = HashMap[Binding, Queue]()
   var queues = HashMap[Long, Queue]()
@@ -161,7 +161,7 @@ class Router(val host:VirtualHost) exten
       record.binding_data = binding.binding_data
       record.binding_kind = binding.binding_kind
 
-      host.store.addQueue(record) { rc => Unit }
+      host.store.add_queue(record) { rc => Unit }
 
     }
     queue.start
@@ -258,8 +258,8 @@ class Router(val host:VirtualHost) exten
     }
     queue.stop
     if( queue.tune_persistent ) {
-      queue.dispatchQueue ^ {
-        host.store.removeQueue(queue.id){x=> Unit}
+      queue.dispatch_queue ^ {
+        host.store.remove_queue(queue.id){x=> Unit}
       }
     }
     Success(Zilch)
@@ -381,7 +381,7 @@ class Router(val host:VirtualHost) exten
     }
 
     dispatchQueue {
-      do_connect.failure_option.foreach(x=> producer.dispatchQueue { completed(Failure(x)) } )
+      do_connect.failure_option.foreach(x=> producer.dispatch_queue { completed(Failure(x)) } )
     }
 
   }
@@ -425,7 +425,7 @@ class RoutingNode(val router:Router, val
 
         // create a temp queue so that it can spool
         val queue = router._create_queue(-1, new TempBinding(consumer), null).success
-        queue.dispatchQueue.setTargetQueue(consumer.dispatchQueue)
+        queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
         queue.bind(List(consumer))
 
         consumer_proxies += consumer->queue
@@ -491,7 +491,7 @@ class RoutingNode(val router:Router, val
  */
 trait Route extends Retained {
 
-  def dispatchQueue:DispatchQueue
+  def dispatch_queue:DispatchQueue
   val metric = new AtomicLong();
 
   def bind(targets:List[DeliveryConsumer]):Unit
@@ -508,23 +508,23 @@ trait Route extends Retained {
 case class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
 
   override protected def log = Router
-  override def dispatchQueue = producer.dispatchQueue
+  override def dispatch_queue = producer.dispatch_queue
 
   // Retain the queue while we are retained.
-  dispatchQueue.retain
+  dispatch_queue.retain
   setDisposer(^{
-    dispatchQueue.release
+    dispatch_queue.release
   })
 
   var targets = List[DeliverySession]()
 
-  def connected() = dispatchQueue {
+  def connected() = dispatch_queue {
     on_connected
   }
 
   def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
-  } >>: dispatchQueue
+  } >>: dispatch_queue
 
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
@@ -550,9 +550,9 @@ case class DeliveryProducerRoute(val rou
       }
       rc
     }
-  } >>: dispatchQueue
+  } >>: dispatch_queue
 
-  def disconnected() = dispatchQueue {
+  def disconnected() = dispatch_queue {
     this.targets.foreach { x=>
       debug("producer route detaching from conusmer.")
       x.close
@@ -594,7 +594,7 @@ case class DeliveryProducerRoute(val rou
 
           if( copy.storeKey == -1L && target.consumer.is_persistent && copy.message.persistent ) {
             if( copy.uow==null ) {
-              copy.uow = router.host.store.createStoreUOW
+              copy.uow = router.host.store.create_uow
             } else {
               copy.uow.retain
             }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Dec 22 17:37:50 2010
@@ -322,7 +322,7 @@ class QueueSink[T](val sizer:Sizer[T], v
   def full = size >= maxSize
   def poll = buffer.poll
   def unpoll(value:T) = buffer.addFirst(value)
-  def isEmpty = buffer.isEmpty
+  def is_empty = buffer.isEmpty
 
   private def drain = drainer.run
 
@@ -345,7 +345,7 @@ class QueueSink[T](val sizer:Sizer[T], v
     // queue
     val wasBlocking = full
     size -= amount
-    if( !isEmpty ) {
+    if( !is_empty ) {
       drain
     } else {
       refiller.run

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Dec 22 17:37:50 2010
@@ -42,7 +42,7 @@ object VirtualHost extends Log {
   /**
    * Creates a default a configuration object.
    */
-  def defaultConfig() = {
+  def default_config() = {
     val rc = new VirtualHostDTO
     rc.id = "default"
     rc.host_names.add("localhost")
@@ -70,23 +70,18 @@ object VirtualHost extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class VirtualHost(val broker: Broker, val id:Long) extends BaseService with DispatchLogging with LoggingReporter {
+class VirtualHost(val broker: Broker, val id:Long) extends BaseService {
   import VirtualHost._
   
-  override protected def log = VirtualHost
-  override val dispatchQueue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
+  override val dispatch_queue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
 
   var config:VirtualHostDTO = _
   val router = new Router(this)
 
   var names:List[String] = Nil;
-  def setNamesArray( names:ArrayList[String]) = {
-    this.names = names.toList
-  }
 
   var store:Store = null
   var direct_buffer_pool:DirectBufferPool = null
-  var transactionManager:TransactionManagerX = new TransactionManagerX
   val queue_id_counter = new LongCounter
 
   val session_counter = new AtomicLong(0)
@@ -103,18 +98,18 @@ class VirtualHost(val broker: Broker, va
     if ( validate(config, reporter) < ERROR ) {
       this.config = config
 
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         // TODO: apply changes while he broker is running.
         reporter.report(WARN, "Updating virtual host configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
 
       }
     }
-  } |>>: dispatchQueue
+  } |>>: dispatch_queue
 
 
-  override protected def _start(onCompleted:Runnable):Unit = {
+  override protected def _start(on_completed:Runnable):Unit = {
 
-    val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
+    val tracker = new LoggingTracker("virtual host startup", dispatch_queue)
 
     if( config.authentication != null ) {
       if( config.authentication.enabled.getOrElse(true) ) {
@@ -137,7 +132,7 @@ class VirtualHost(val broker: Broker, va
     //    val memory_pool_config: String = null
     var direct_buffer_pool_config: String = "hawtdb:activemq.tmp"
 
-    if( direct_buffer_pool_config!=null &&  (store!=null && !store.supportsDirectBuffers) ) {
+    if( direct_buffer_pool_config!=null &&  (store!=null && !store.supports_direct_buffers) ) {
       warn("The direct buffer pool will not be used because the configured store does not support them.")
       direct_buffer_pool_config = null
     }
@@ -148,38 +143,38 @@ class VirtualHost(val broker: Broker, va
     }
 
     if( store!=null ) {
-      store.configure(config.store, this)
-      val storeStartupDone = tracker.task("store startup")
+      store.configure(config.store, LoggingReporter(VirtualHost))
+      val store_startup_done = tracker.task("store startup")
       store.start {
 
-        val getKeyDone = tracker.task("store get last queue key")
-        store.getLastQueueKey{ key=>
+        val get_key_done = tracker.task("store get last queue key")
+        store.get_last_queue_key{ key=>
           key match {
             case Some(x)=>
               queue_id_counter.set(key.get)
             case None =>
               warn("Could not get last queue key")
           }
-          getKeyDone.run
+          get_key_done.run
         }
 
         if( config.purge_on_startup.getOrElse(false) ) {
-          storeStartupDone.name = "store purge"
+          store_startup_done.name = "store purge"
           store.purge {
-            storeStartupDone.run
+            store_startup_done.run
           }
         } else {
-          storeStartupDone.name = "store recover queues"
-          store.listQueues { queueKeys =>
-            for( queueKey <- queueKeys) {
-              val task = tracker.task("store load queue key: "+queueKey)
+          store_startup_done.name = "store recover queues"
+          store.list_queues { queue_keys =>
+            for( queue_key <- queue_keys) {
+              val task = tracker.task("store load queue key: "+queue_key)
               // Use a global queue to so we concurrently restore
               // the queues.
               globalQueue {
-                store.getQueue(queueKey) { x =>
+                store.get_queue(queue_key) { x =>
                   x match {
                     case Some(record)=>
-                    dispatchQueue ^{
+                    dispatch_queue ^{
                       router.create_queue(record, null)
                       task.run
                     }
@@ -189,18 +184,13 @@ class VirtualHost(val broker: Broker, va
                 }
               }
             }
-            storeStartupDone.run
+            store_startup_done.run
           }
         }
       }
     }
 
-
-    //Recover transactions:
-    transactionManager.virtualHost = this
-    transactionManager.loadTransactions();
-
-    tracker.callback(onCompleted)
+    tracker.callback(on_completed)
 
     if(config.regroup_connections.getOrElse(false)) {
       schedual_connection_regroup
@@ -208,9 +198,9 @@ class VirtualHost(val broker: Broker, va
   }
 
 
-  override protected def _stop(onCompleted:Runnable):Unit = {
+  override protected def _stop(on_completed:Runnable):Unit = {
 
-    val tracker = new LoggingTracker("virtual host shutdown", dispatchQueue)
+    val tracker = new LoggingTracker("virtual host shutdown", dispatch_queue)
     router.queues.valuesIterator.foreach { queue=>
       tracker.stop(queue)
     }
@@ -222,7 +212,7 @@ class VirtualHost(val broker: Broker, va
     if( store!=null ) {
       tracker.stop(store);
     }
-    tracker.callback(onCompleted)
+    tracker.callback(on_completed)
   }
 
 
@@ -241,25 +231,25 @@ class VirtualHost(val broker: Broker, va
         // thread.
         node.broadcast_consumers.headOption.foreach{ consumer =>
           node.broadcast_producers.foreach { r=>
-            r.producer.collocate(consumer.dispatchQueue)
+            r.producer.collocate(consumer.dispatch_queue)
           }
         }
 
         node.queues.foreach { queue=>
 
-          queue.dispatchQueue {
+          queue.dispatch_queue {
 
             // Collocate the queue's with the first consumer
             // TODO: change this so it collocates with the fastest consumer.
 
             queue.all_subscriptions.headOption.map( _._1 ).foreach { consumer=>
-              queue.collocate( consumer.dispatchQueue )
+              queue.collocate( consumer.dispatch_queue )
             }
 
             // Collocate all the producers with the queue..
 
             queue.inbound_sessions.foreach { session =>
-              session.producer.collocate( queue.dispatchQueue )
+              session.producer.collocate( queue.dispatch_queue )
             }
           }
 
@@ -267,7 +257,7 @@ class VirtualHost(val broker: Broker, va
       }
       schedual_connection_regroup
     }
-    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
+    dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(service_state.is_started) { connectionRegroup } } )
   }
 
   def destination_config(name:Path):Option[DestinationDTO] = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala Wed Dec 22 17:37:50 2010
@@ -141,7 +141,7 @@ class MultiProtocolHandler extends Proto
 
   var discriminated = false
 
-  override def onTransportCommand(command: Any) = {
+  override def on_transport_command(command: AnyRef) = {
 
     if (!command.isInstanceOf[ProtocolCodec]) {
       throw new ProtocolException("Expected a protocol codec");
@@ -151,33 +151,33 @@ class MultiProtocolHandler extends Proto
 
     var codec: ProtocolCodec = command.asInstanceOf[ProtocolCodec];
     val protocol = codec.protocol()
-    val protocolHandler = ProtocolFactory.get(protocol) match {
+    val protocol_handler = ProtocolFactory.get(protocol) match {
       case Some(x) => x.createProtocolHandler
       case None =>
         throw new ProtocolException("No protocol handler available for protocol: " + protocol);
     }
 
-    protocolHandler.setConnection(connection);
+    protocol_handler.set_connection(connection);
 
     // replace the current handler with the new one.
-    connection.protocolHandler = protocolHandler
+    connection.protocol_handler = protocol_handler
     connection.transport.setProtocolCodec(codec)
 
     connection.transport.suspendRead
-    protocolHandler.onTransportConnected
+    protocol_handler.on_transport_connected
   }
 
-  override def onTransportConnected = {
+  override def on_transport_connected = {
     connection.transport.resumeRead
     
     // Make sure client connects eventually...
-    connection.dispatchQueue.after(5, TimeUnit.SECONDS) {
+    connection.dispatch_queue.after(5, TimeUnit.SECONDS) {
       assert_discriminated
     }
   }
 
   def assert_discriminated = {
-    if( connection.serviceState.isStarted && !discriminated ) {
+    if( connection.service_state.is_started && !discriminated ) {
       connection.stop
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Wed Dec 22 17:37:50 2010
@@ -67,19 +67,26 @@ trait Protocol extends ProtocolCodecFact
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait ProtocolHandler extends DefaultTransportListener {
+trait ProtocolHandler {
 
   def protocol:String
 
   var connection:BrokerConnection = null;
 
-  def setConnection(brokerConnection:BrokerConnection) = {
+  def set_connection(brokerConnection:BrokerConnection) = {
     this.connection = brokerConnection
   }
 
-  override def onTransportFailure(error:IOException) = {
+  def create_connection_status = new ConnectionStatusDTO
+
+  def on_transport_failure(error:IOException) = {
     connection.stop()
   }
 
-  def create_connection_status = new ConnectionStatusDTO
+  def on_transport_disconnected = {}
+
+  def on_transport_connected = {}
+
+  def on_transport_command(command: AnyRef) = {}
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Wed Dec 22 17:37:50 2010
@@ -47,15 +47,15 @@ trait DelayingStoreSupport extends Store
   // Implementation of the BaseService interface
   //
   /////////////////////////////////////////////////////////////////////
-  val dispatchQueue:DispatchQueue = createQueue(toString)
-  val aggregator = new AggregatingExecutor(dispatchQueue)
+  val dispatch_queue:DispatchQueue = createQueue(toString)
+  val aggregator = new AggregatingExecutor(dispatch_queue)
 
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the StoreBatch interface
   //
   /////////////////////////////////////////////////////////////////////
-  def createStoreUOW() = new DelayableUOW
+  def create_uow() = new DelayableUOW
 
   class DelayableUOW extends BaseRetained with StoreUOW {
 
@@ -83,9 +83,9 @@ trait DelayingStoreSupport extends Store
     var completeListeners = ListBuffer[Runnable]()
     var disableDelay = false
 
-    def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+    def on_complete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
 
-    def completeASAP() = this.synchronized { disableDelay=true }
+    def complete_asap() = this.synchronized { disableDelay=true }
 
     var delayable_actions = 0
 
@@ -131,7 +131,7 @@ trait DelayingStoreSupport extends Store
 
     def enqueue(entry: QueueEntryRecord) = {
       val a = this.synchronized {
-        val a = action(entry.messageKey)
+        val a = action(entry.message_key)
         a.enqueues += entry
         delayable_actions += 1
         a
@@ -144,7 +144,7 @@ trait DelayingStoreSupport extends Store
 
     def dequeue(entry: QueueEntryRecord) = {
       this.synchronized {
-        action(entry.messageKey).dequeues += entry
+        action(entry.message_key).dequeues += entry
       }
     }
 
@@ -163,14 +163,14 @@ trait DelayingStoreSupport extends Store
   }
 
 
-  def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
+  def flush_message(messageKey: Long)(cb: => Unit) = dispatch_queue {
     val action: DelayableUOW#MessageAction = pendingStores.get(messageKey)
     if( action == null ) {
       cb
     } else {
       // TODO: protect against this causing a 2nd flush.
       delayedUOWs.put(action.uow.uow_id, action.uow)
-      action.uow.onComplete(^{ cb })
+      action.uow.on_complete(^{ cb })
       flush(action.uow.uow_id)
     }
   }
@@ -215,9 +215,9 @@ trait DelayingStoreSupport extends Store
   var canceled_enqueue:Long = 0
 
 
-  def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
+  def key(x:QueueEntryRecord) = (x.queue_key, x.entry_seq)
 
-  val uow_source = createSource(new ListEventAggregator[DelayableUOW](), dispatchQueue)
+  val uow_source = createSource(new ListEventAggregator[DelayableUOW](), dispatch_queue)
   uow_source.setEventHandler(^{drain_uows});
   uow_source.resume
 
@@ -277,7 +277,7 @@ trait DelayingStoreSupport extends Store
 
       val batch_id = uow.uow_id
       if( uow.delayable ) {
-        dispatchQueue.dispatchAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
+        dispatch_queue.dispatchAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
       } else {
         flush(batch_id)
       }
@@ -289,7 +289,7 @@ trait DelayingStoreSupport extends Store
     flush_source.merge(batch_id)
   }
 
-  val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+  val flush_source = createSource(new ListEventAggregator[Int](), dispatch_queue)
   flush_source.setEventHandler(^{drain_flushes});
   flush_source.resume
 
@@ -298,7 +298,7 @@ trait DelayingStoreSupport extends Store
 
   def drain_flushes:Unit = {
 
-    if( !serviceState.isStarted ) {
+    if( !service_state.is_started ) {
       return
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java Wed Dec 22 17:37:50 2010
@@ -20,8 +20,8 @@ package org.apache.activemq.apollo.broke
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class QueueEntryRange {
-    public long firstQueueSeq;
-    public long lastQueueSeq;
+    public long first_entry_seq;
+    public long last_entry_seq;
     public int count;
     public int size;
-}
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java Wed Dec 22 17:37:50 2010
@@ -23,9 +23,9 @@ import org.fusesource.hawtbuf.Buffer;
  */
 public class QueueEntryRecord {
 
-    public long queueKey;
-    public long queueSeq;
-    public long messageKey;
+    public long queue_key;
+    public long entry_seq;
+    public long message_key;
     public Buffer attachment;
     public int size;
     public short redeliveries;

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Wed Dec 22 17:37:50 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke
  */
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, StoreDTO}
 import org.apache.activemq.apollo.util._
-import ReporterLevel._
 
 /**
  * <p>
@@ -30,19 +29,19 @@ import ReporterLevel._
  */
 trait Store extends ServiceTrait {
 
-  def storeStatusDTO(callback:(StoreStatusDTO)=>Unit)
+  def get_store_status(callback:(StoreStatusDTO)=>Unit)
 
   /**
    * @returns true if the store implementation can handle accepting
    *          MessageRecords with DirectBuffers in them.
    */
-  def supportsDirectBuffers() = false
+  def supports_direct_buffers() = false
 
   /**
    * Creates a store uow which is used to perform persistent
    * operations as unit of work.
    */
-  def createStoreUOW():StoreUOW
+  def create_uow():StoreUOW
 
   /**
    * Supplies configuration data to the Store.  This will be called
@@ -59,7 +58,7 @@ trait Store extends ServiceTrait {
   /**
    * Ges the last queue key identifier stored.
    */
-  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit
+  def get_last_queue_key(callback:(Option[Long])=>Unit):Unit
 
   /**
    * Adds a queue.
@@ -67,47 +66,47 @@ trait Store extends ServiceTrait {
    * This method auto generates and assigns the key field of the queue record and
    * returns true if it succeeded.
    */
-  def addQueue(record:QueueRecord)(callback:(Boolean)=>Unit):Unit
+  def add_queue(record:QueueRecord)(callback:(Boolean)=>Unit):Unit
 
   /**
    * Removes a queue. Success is reported via the callback.
    */
-  def removeQueue(queueKey:Long)(callback:(Boolean)=>Unit):Unit
+  def remove_queue(queueKey:Long)(callback:(Boolean)=>Unit):Unit
 
   /**
    * Loads the queue information for a given queue key.
    */
-  def getQueue(queueKey:Long)(callback:(Option[QueueRecord])=>Unit )
+  def get_queue(queueKey:Long)(callback:(Option[QueueRecord])=>Unit )
 
   /**
    * Gets a listing of all queue entry sequences previously added
    * and reports them to the callback.
    */
-  def listQueues(callback: (Seq[Long])=>Unit )
+  def list_queues(callback: (Seq[Long])=>Unit )
 
   /**
    * Groups all the entries in the specified queue into ranges containing up limit entries
    * big and returns those ranges.  Allows you to incrementally, load all the entries in
    * a queue.
    */
-  def listQueueEntryRanges(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryRange])=>Unit )
+  def list_queue_entry_ranges(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryRange])=>Unit )
 
   /**
    * Loads all the queue entry records for the given queue id between the first and last provided
    * queue sequences (inclusive).
    */
-  def listQueueEntries(queueKey:Long, firstSeq:Long, lastSeq:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
+  def list_queue_entries(queueKey:Long, firstSeq:Long, lastSeq:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
 
   /**
    * Removes a the delivery associated with the provided from any
    * internal buffers/caches.  The callback is executed once, the message is
    * no longer buffered.
    */
-  def flushMessage(messageKey:Long)(callback: =>Unit)
+  def flush_message(messageKey:Long)(callback: =>Unit)
 
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def loadMessage(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
+  def load_message(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
 
-}
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala Wed Dec 22 17:37:50 2010
@@ -63,12 +63,12 @@ trait StoreUOW extends Retained {
    * all it operations and thus avoid the cost of the
    * persistence operations.
    */
-  def completeASAP()
+  def complete_asap()
 
   /**
    * The specified callback is executed once the UOW
    * is completed.
    */
-  def onComplete(callback: Runnable)
+  def on_complete(callback: Runnable)
 
 }



Mime
View raw message