activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961119 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/ activemq...
Date Wed, 07 Jul 2010 04:00:32 GMT
Author: chirino
Date: Wed Jul  7 04:00:31 2010
New Revision: 961119

URL: http://svn.apache.org/viewvc?rev=961119&view=rev
Log:
Added a Protocol class to help marshall/unmarshall protocol messages from storage

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
      - copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols Wed Jul  7 04:00:31 2010
@@ -1,17 +1,17 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-## 
-## http://www.apache.org/licenses/LICENSE-2.0
-## 
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-class=org.apache.activemq.apollo.broker.MultiProtocolHandler
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.protocol.MultiProtocol
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 04:00:31 2010
@@ -27,6 +27,7 @@ import _root_.org.apache.activemq.wirefo
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import java.util.concurrent.atomic.AtomicLong
 import org.fusesource.hawtdispatch.Dispatch
+import protocol.{ProtocolFactory, ProtocolHandler}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -95,7 +96,7 @@ class BrokerConnection(val connector: Co
 
   override protected  def _start(onCompleted:Runnable) = {
     connector.dispatchQueue.retain
-    protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
+    protocolHandler = ProtocolFactory.get(protocol).createProtocolHandler
     protocolHandler.setConnection(this);
     super._start(onCompleted)
   }
@@ -132,70 +133,6 @@ class BrokerConnection(val connector: Co
  */
 class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e)
 
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class MultiProtocolHandler extends ProtocolHandler {
-
-  var connected = false
-
-  override def onTransportCommand(command:Any) = {
-
-    if (!command.isInstanceOf[WireFormat]) {
-      throw new ProtocolException("First command should be a WireFormat");
-    }
-
-    var wireformat:WireFormat = command.asInstanceOf[WireFormat];
-    val protocol = wireformat.getName()
-    val protocolHandler = try {
-      // Create the new protocol handler..
-       ProtocolHandlerFactory.createProtocolHandler(protocol);
-    } catch {
-      case e:Exception=>
-      throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
-    }
-    protocolHandler.setConnection(connection);
-
-    // replace the current handler with the new one.
-    connection.protocol = protocol
-    connection.protocolHandler = protocolHandler
-    connection.transport.suspendRead
-    protocolHandler.onTransportConnected
-  }
-
-  override def onTransportConnected = {
-    connection.transport.resumeRead
-  }
-
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object ProtocolHandlerFactory {
-    val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/protocol/");
-
-    def createProtocolHandler(protocol:String) = {
-        PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler]
-    }
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait ProtocolHandler extends DefaultTransportListener {
-
-  var connection:BrokerConnection = null;
-
-  def setConnection(brokerConnection:BrokerConnection) = {
-    this.connection = brokerConnection
-  }
-
-  override def onTransportFailure(error:IOException) = {
-    connection.stop()
-  }
-
-}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:00:31 2010
@@ -20,6 +20,8 @@ import _root_.org.apache.activemq.filter
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
+import org.apache.activemq.broker.store.StoreTransaction
+
 /**
  * A producer which sends Delivery objects to a delivery consumer.
  *
@@ -110,7 +112,7 @@ trait Message {
   /**
    * The protocol encoding of the message.
    */
-  def protocol:String
+  def protocol:AsciiBuffer
 
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:00:31 2010
@@ -23,6 +23,8 @@ import org.fusesource.hawtdispatch.{Scal
 import org.apache.activemq.util.TreeMap.TreeEntry
 import java.util.{Collections, ArrayList, LinkedList}
 import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
+import org.apache.activemq.broker.store.{StoredMessage, StoreTransaction}
+import protocol.ProtocolFactory
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -76,7 +78,7 @@ class Queue(val host: VirtualHost, val d
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val store_load_source = createSource(new ListEventAggregator[(QueueEntry, Delivery)](), dispatchQueue)
+  val store_load_source = createSource(new ListEventAggregator[(QueueEntry, StoredMessage)](), dispatchQueue)
   store_load_source.setEventHandler(^ {drain_store_loads});
   store_load_source.resume
 
@@ -374,8 +376,16 @@ class Queue(val host: VirtualHost, val d
           var ref = loaded.delivery.storeId
           if( ref == -1 ) {
             val tx = host.database.createStoreTransaction
-            tx.store(loaded.delivery)
-            tx.enqueue(storeId, entry.seq, loaded.delivery.storeId)
+
+            val message = loaded.delivery.message
+            val sm = new StoredMessage
+            sm.protocol = message.protocol
+            sm.value = ProtocolFactory.get(message.protocol).encode(message)
+            sm.size = loaded.size
+
+            tx.store(sm)
+            loaded.delivery.storeId = sm.id
+            tx.enqueue(storeId, entry.seq, sm.id)
             tx.release
           }
           flushingSize += entry.value.size
@@ -394,7 +404,15 @@ class Queue(val host: VirtualHost, val d
 
     data.foreach { event =>
       val entry = event._1
-      entry.loaded(event._2)
+      val stored = event._2
+
+      val delivery = new Delivery()
+      delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
+      delivery.size = stored.size
+      delivery.storeId = stored.id
+
+      entry.loaded(delivery)
+
       size += entry.value.size
 
       if( entry.hasSubs ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:00:31 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke
 
 import _root_.java.util.{ArrayList, HashMap}
 import _root_.org.apache.activemq.broker.store.memory.MemoryStore
-import _root_.org.apache.activemq.broker.store.{Store}
 import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
@@ -30,6 +29,9 @@ import org.apache.activemq.apollo.dto.Vi
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import ReporterLevel._
+import org.apache.activemq.apollo.store.memory.MemoryBrokerDatabase
+import org.apache.activemq.broker.store.{StoredQueue, BrokerDatabase, Store}
+import org.fusesource.hawtbuf.proto.WireFormat
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -86,9 +88,11 @@ class VirtualHost(val broker: Broker) ex
     this.names = names.toList
   }
 
-  var database:BrokerDatabase = new MemoryBrokerDatabase(this)
+  var database:BrokerDatabase = new MemoryBrokerDatabase()
   var transactionManager:TransactionManagerX = new TransactionManagerX
 
+  var protocols = Map[AsciiBuffer, WireFormat]()
+
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
 
   /**
@@ -117,7 +121,7 @@ class VirtualHost(val broker: Broker) ex
           x match {
             case Some(info)=>
             dispatchQueue ^{
-              val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+              val dest = DestinationParser.parse(info.name, destination_parser_options)
               if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
 
                 val queue = new Queue(this, dest, id)
@@ -126,7 +130,7 @@ class VirtualHost(val broker: Broker) ex
                 queue.message_seq_counter = info.last+1
                 queue.count = info.count
 
-                queues.put(info.record.name, queue)
+                queues.put(info.name, queue)
               }
             }
             case _ =>
@@ -180,7 +184,8 @@ class VirtualHost(val broker: Broker) ex
 
   def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
     val name = DestinationParser.toBuffer(dest, destination_parser_options)
-    val record = QueueRecord(0, name, null, null)
+    val record = new StoredQueue
+    record.name = name
     database.addQueue(record) { rc =>
       rc match {
         case Some(id) =>

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=961119&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala Wed Jul  7 04:00:31 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.broker.{Message, ProtocolException}
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import org.apache.activemq.wireformat.{MultiWireFormatFactory, WireFormat}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MultiProtocol extends Protocol {
+
+  val wff = new MultiWireFormatFactory
+
+  def name = new AsciiBuffer("multi")
+
+  def createWireFormat = wff.createWireFormat
+  def createProtocolHandler = new MultiProtocolHandler
+  
+  def encode(message: Message) = throw new UnsupportedOperationException
+  def decode(message: Buffer) = throw new UnsupportedOperationException
+}
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MultiProtocolHandler extends ProtocolHandler {
+
+  var connected = false
+
+  override def onTransportCommand(command:Any) = {
+
+    if (!command.isInstanceOf[WireFormat]) {
+      throw new ProtocolException("First command should be a WireFormat");
+    }
+
+    var wireformat:WireFormat = command.asInstanceOf[WireFormat];
+    val protocol = wireformat.getName()
+    val protocolHandler = try {
+      // Create the new protocol handler..
+       ProtocolFactory.get(protocol).createProtocolHandler
+    } catch {
+      case e:Exception=>
+      throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
+    }
+    protocolHandler.setConnection(connection);
+
+    // replace the current handler with the new one.
+    connection.protocol = protocol
+    connection.protocolHandler = protocolHandler
+    connection.transport.suspendRead
+    protocolHandler.onTransportConnected
+  }
+
+  override def onTransportConnected = {
+    connection.transport.resumeRead
+  }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=961119&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Wed Jul  7 04:00:31 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import java.util.Properties
+import java.net.{URLClassLoader, URL}
+import org.apache.activemq.transport.DefaultTransportListener
+import java.io.{IOException, File, InputStream}
+import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.apache.activemq.wireformat.WireFormat
+
+
+/**
+ * <p>
+ * Used to discover classes using the META-INF discovery trick.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class ClassFinder[T](path:String, loaders:Seq[ClassLoader]=Thread.currentThread.getContextClassLoader::Nil) {
+
+  def find(): List[Class[T]] = {
+    var classes = List[Class[T]]()
+    loaders.foreach { loader=>
+
+      val resources = loader.getResources(path)
+      var classNames: List[String] = Nil
+      while(resources.hasMoreElements) {
+        val url = resources.nextElement;
+        val p = loadProperties(url.openStream)
+        val enum = p.keys
+        while (enum.hasMoreElements) {
+          classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
+        }
+      }
+      classNames = classNames.removeDuplicates
+
+      classes :::= classNames.map { name=>
+        loader.loadClass(name).asInstanceOf[Class[T]]
+      }
+
+    }
+
+    return classes.removeDuplicates
+  }
+
+  private def loadProperties(is:InputStream):Properties = {
+    if( is==null ) {
+      return null;
+    }
+    try {
+      val p = new Properties()
+      p.load(is);
+      return p
+    } catch {
+      case e:Exception =>
+      return null
+    } finally {
+      try {
+        is.close()
+      } catch {
+        case _ =>
+      }
+    }
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ProtocolFactory {
+
+  val finder =  ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocols")
+  var protocols = Map[AsciiBuffer, Protocol]()
+
+  finder.find.foreach{ clazz =>
+    try {
+
+      val protocol = clazz.newInstance.asInstanceOf[Protocol]
+      protocols += protocol.name -> protocol
+
+    } catch {
+      case e:Throwable =>
+        e.printStackTrace
+    }
+  }
+
+  def get(name:String):Protocol = get(new AsciiBuffer(name))
+
+  def get(name:AsciiBuffer):Protocol = {
+    protocols.get(name) match {
+      case None =>
+        throw new IllegalArgumentException("Protocol not found: "+name)
+      case Some(x) => x
+    }
+  }
+
+}
+
+trait Protocol {
+
+  def name:AsciiBuffer
+
+  def createWireFormat:WireFormat
+  def createProtocolHandler:ProtocolHandler
+
+  def encode(message:Message):Buffer
+  def decode(message:Buffer):Message
+
+}
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ProtocolHandler extends DefaultTransportListener {
+
+  var connection:BrokerConnection = null;
+
+  def setConnection(brokerConnection:BrokerConnection) = {
+    this.connection = brokerConnection
+  }
+
+  override def onTransportFailure(error:IOException) = {
+    connection.stop()
+  }
+
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols Wed Jul  7 04:00:31 2010
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.apollo.stomp.StompProtocolHandler
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:00:31 2010
@@ -24,6 +24,7 @@ import _root_.org.fusesource.hawtdispatc
 
 import AsciiBuffer._
 import org.apache.activemq.apollo.broker._
+import protocol.{Protocol, ProtocolHandler}
 import Stomp._
 import BufferConversions._
 import StompFrameConstants._
@@ -49,6 +50,27 @@ object StompConstants {
 
 }
 
+class StompProtocol extends Protocol {
+  val wff = new StompWireFormatFactory
+
+  def name = new AsciiBuffer("stomp")
+
+  def createWireFormat = wff.createWireFormat
+
+  def createProtocolHandler = new StompProtocolHandler
+
+  def encode(message: Message) = {
+    val sfm = message.asInstanceOf[StompFrameMessage]
+    createWireFormat.marshal(sfm.frame)
+  }
+
+  def decode(message: Buffer) = {
+    val frame = createWireFormat.unmarshal(message).asInstanceOf[StompFrame]
+    StompFrameMessage(frame)
+  }
+
+}
+
 import StompConstants._
 
 object StompProtocolHandler extends Log

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:00:31 2010
@@ -86,6 +86,14 @@ class StompWireFormat extends WireFormat
     os.toBuffer
   }
 
+  def unmarshal(packet:Buffer):AnyRef = {
+    unmarshal(new DataByteArrayInputStream(packet))
+  }
+
+  def unmarshal(in: DataInput):AnyRef = {
+    throw new UnsupportedOperationException
+  }
+
   def marshal(frame:StompFrame, os:DataOutput) = {
     frame.action.writeTo(os)
     os.write(NEWLINE)
@@ -113,13 +121,6 @@ class StompWireFormat extends WireFormat
     END_OF_FRAME_BUFFER.writeTo(os)
   }
 
-  def unmarshal(packet:Buffer) = {
-    throw new UnsupportedOperationException
-  }
-  def unmarshal(in: DataInput):Object = {
-    throw new UnsupportedOperationException
-  }
-
   def getName() = "stomp"
 
   //

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul  7 04:00:31 2010
@@ -20,7 +20,7 @@
 
   <parent>
     <groupId>org.apache.activemq</groupId>
-    <artifactId>activemq-project</artifactId>
+    <artifactId>activemq-scala</artifactId>
     <version>6.0-SNAPSHOT</version>
   </parent>
 
@@ -43,9 +43,35 @@
       <artifactId>hawtbuf-core</artifactId>
       <version>${hawtbuf-version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch-scala</artifactId>
+      <version>${hawtdispatch-version}</version>
+    </dependency>
+
+    <!-- Scala Support -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>compile</scope>
+      <version>${scala-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala-version}</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
 
     <!-- Testing Dependencies -->    
     <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala?rev=961119&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala Wed Jul  7 04:00:31 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import _root_.java.lang.{String}
+import org.fusesource.hawtbuf._
+import org.apache.activemq.Service
+import org.fusesource.hawtdispatch.{Retained}
+
+class StoredQueue {
+  var id:Long = -1
+  var name:AsciiBuffer = null
+  var parent:AsciiBuffer = null
+  var config:String = null
+  var first:Long = -1
+  var last:Long = -1
+  var count:Int = 0
+}
+
+class StoredMessage {
+  var id:Long = -1
+  var protocol:AsciiBuffer = null
+  var value:Buffer = null
+  var size:Int = 0
+}
+
+/**
+ * A StoreTransaction is used to perform persistent
+ * operations as unit of work.
+ *
+ * The disposer assigned to the store transaction will
+ * be executed once all associated persistent operations
+ * have been persisted.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait StoreTransaction extends Retained {
+
+  /**
+   * Assigns the delivery a store id if it did not already
+   * have one assigned.
+   */
+  def store(delivery:StoredMessage)
+
+  /**
+   * Adds a delivery to a specified queue at a the specified position in the queue.
+   */
+  def enqueue(queue:Long, seq:Long, msg:Long)
+
+  /**
+   * Removes a delivery from a specified queue at a the specified position in the queue.
+   */
+  def dequeue(queue:Long, seq:Long, msg:Long)
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BrokerDatabase extends Service {
+
+
+  /**
+   * Stores a queue, calls back with a unquie id for the stored queue.
+   */
+  def addQueue(record:StoredQueue)(cb:(Option[Long])=>Unit):Unit
+
+  /**
+   * Loads the queue information for a given queue id.
+   */
+  def getQueueInfo(id:Long)(cb:(Option[StoredQueue])=>Unit )
+
+  /**
+   * gets a listing of all queues previously added.
+   */
+  def listQueues(cb: (Seq[Long])=>Unit )
+
+  /**
+   * Removes a the delivery associated with the provided from any
+   * internal buffers/caches.  The callback is executed once, the message is
+   * no longer buffered.
+   */
+  def flushDelivery(id:Long)(cb: =>Unit)
+
+  /**
+   * Loads a delivery with the associated id from persistent storage.
+   */
+  def loadDelivery(id:Long)(cb:(Option[StoredMessage])=>Unit )
+
+  /**
+   * Creates a StoreTransaction which is used to perform persistent
+   * operations as unit of work.
+   */
+  def createStoreTransaction():StoreTransaction
+
+}
+

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala Wed Jul  7 04:00:31 2010
@@ -14,7 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker
+package org.apache.activemq.apollo.store.memory
+
 
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -26,81 +27,15 @@ import java.util.{ArrayList, HashSet}
 import collection.mutable.HashMap
 import org.apache.activemq.Service
 import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, Retained}
-
-case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String)
-case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
+import org.apache.activemq.apollo.util.BaseService
+import org.apache.activemq.broker.store._
 
 /**
- * A StoreTransaction is used to perform persistent
- * operations as unit of work.
- *
- * The disposer assigned to the store transaction will
- * be executed once all associated persistent operations
- * have been persisted.
+ * <p>
+ * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait StoreTransaction extends Retained {
-
-  /**
-   * Assigns the delivery a store id if it did not already
-   * have one assigned.
-   */
-  def store(delivery:Delivery)
-
-  /**
-   * Adds a delivery to a specified queue at a the specified position in the queue.
-   */
-  def enqueue(queue:Long, seq:Long, msg:Long)
-
-  /**
-   * Removes a delivery from a specified queue at a the specified position in the queue.
-   */
-  def dequeue(queue:Long, seq:Long, msg:Long)
-
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait BrokerDatabase extends Service {
-
-
-  /**
-   * Stores a queue, calls back with a unquie id for the stored queue.
-   */
-  def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
-
-  /**
-   * Loads the queue information for a given queue id.
-   */
-  def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit )
-
-  /**
-   * gets a listing of all queues previously added.
-   */
-  def listQueues(cb: (Seq[Long])=>Unit )
-
-  /**
-   * Removes a the delivery associated with the provided from any
-   * internal buffers/caches.  The callback is executed once, the message is
-   * no longer buffered.
-   */
-  def flushDelivery(id:Long)(cb: =>Unit)
-
-  /**
-   * Loads a delivery with the associated id from persistent storage.
-   */
-  def loadDelivery(id:Long)(cb:(Option[Delivery])=>Unit )
-
-  /**
-   * Creates a StoreTransaction which is used to perform persistent
-   * operations as unit of work.
-   */
-  def createStoreTransaction():StoreTransaction
-
-}
-
 class Counter(private var value:Int = 0) {
 
   def get() = value
@@ -126,9 +61,10 @@ class Counter(private var value:Int = 0)
 /**
  *  @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class MemoryBrokerDatabase(host:VirtualHost) extends BaseService with BrokerDatabase {
+class MemoryBrokerDatabase() extends BaseService with BrokerDatabase {
 
   val dispatchQueue = createQueue("MessagesTable")
+  def getDispatchQueue = dispatchQueue
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -136,11 +72,11 @@ class MemoryBrokerDatabase(host:VirtualH
   //
   /////////////////////////////////////////////////////////////////////
 
-  protected def _stop(onCompleted: Runnable) = {
+  def _stop(onCompleted: Runnable) = {
     onCompleted.run
   }
 
-  protected def _start(onCompleted: Runnable) = {
+  def _start(onCompleted: Runnable) = {
     onCompleted.run
   }
 
@@ -152,7 +88,7 @@ class MemoryBrokerDatabase(host:VirtualH
   private val queue_id_generator = new AtomicLong
   val queues = new TreeMap[Long, QueueData]
 
-  case class QueueData(val record:QueueRecord) {
+  case class QueueData(val record:StoredQueue) {
     var messges = new TreeMap[Long, Long]()
   }
 
@@ -160,22 +96,26 @@ class MemoryBrokerDatabase(host:VirtualH
     JavaConversions.asSet(queues.keySet).toSeq
   } >>: dispatchQueue
 
-  def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit ) = reply(cb) {
+  def getQueueInfo(id:Long)(cb:(Option[StoredQueue])=>Unit ) = reply(cb) {
     val qd = queues.get(id)
     if( qd == null ) {
       None
     } else {
-      Some(
-        if( qd.messges.isEmpty ) {
-          QueueInfo(qd.record, -1, -1, 0)
-        } else {
-          QueueInfo(qd.record, qd.messges.firstKey, qd.messges.lastKey, qd.messges.size)
-        }
-      )
+      val rc = qd.record
+      if( qd.messges.isEmpty ) {
+        rc.count = 0
+        rc.first = -1
+        rc.last = -1
+      } else {
+        rc.count = qd.messges.size
+        rc.first = qd.messges.firstKey
+        rc.last = qd.messges.lastKey
+      }
+      Some(rc)
     }
   } >>: dispatchQueue
 
-  def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
+  def addQueue(record:StoredQueue)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
     val id = queue_id_generator.incrementAndGet
     if( queues.containsKey(id) ) {
       None
@@ -190,7 +130,7 @@ class MemoryBrokerDatabase(host:VirtualH
   // Methods related to message storage
   //
   /////////////////////////////////////////////////////////////////////
-  class MessageData(val delivery:Delivery) {
+  class MessageData(val delivery:StoredMessage) {
     val queueRefs = new Counter()
     var onFlush = List[()=>Unit]()
   }
@@ -207,7 +147,7 @@ class MemoryBrokerDatabase(host:VirtualH
     }
   } >>: dispatchQueue
 
-  def loadDelivery(ref:Long)(cb:(Option[Delivery])=>Unit ) = reply(cb) {
+  def loadDelivery(ref:Long)(cb:(Option[StoredMessage])=>Unit ) = reply(cb) {
     val rc = messages.get(ref)
     if( rc == null ) {
       None
@@ -235,13 +175,13 @@ class MemoryBrokerDatabase(host:VirtualH
 
     val updated = HashMap[Long, MessageData]()
 
-    def store(delivery:Delivery) = {
-      if( delivery.storeId == -1 ) {
-        delivery.storeId = msg_id_generator.incrementAndGet
+    def store(sm:StoredMessage) = {
+      if( sm.id == -1 ) {
+        sm.id = msg_id_generator.incrementAndGet
         using(this) {
-          val md = new MessageData(delivery)
-          updated.put(delivery.storeId, md)
-          messages.put(delivery.storeId, md)
+          val md = new MessageData(sm)
+          updated.put(sm.id, md)
+          messages.put(sm.id, md)
         } >>: dispatchQueue
       }
     }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
    (empty)



Mime
View raw message