activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961218 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/ activemq-broke...
Date Wed, 07 Jul 2010 04:23:37 GMT
Author: chirino
Date: Wed Jul  7 04:23:35 2010
New Revision: 961218

URL: http://svn.apache.org/viewvc?rev=961218&view=rev
Log:
Consolidated the transport module to live in apollo.transport package.  Renamed WireFormat to ProtocolCodec.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DefaultTransportListener.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgent.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryAgentFactory.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryEvent.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/DiscoveryListener.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodec.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ObjectStreamProtocolCodecFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodecFactory.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/Transport.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactory.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFactorySupport.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportFilter.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportListener.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServer.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/TransportServerSupport.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/package.html
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportFactory.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java
      - copied, changed from r961217, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/multi
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryEvent.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/package.html
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/wireformat/mock
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols Wed Jul  7 04:23:35 2010
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.MultiProtocol
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.MultiProtocolFactorySPI
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 04:23:35 2010
@@ -16,18 +16,12 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.beans.ExceptionListener
 import _root_.java.io.{IOException}
 import _root_.org.apache.activemq.filter.{BooleanExpression}
-import _root_.org.apache.activemq.transport._
-import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder}
-import _root_.org.apache.activemq.wireformat.WireFormat
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.util.concurrent.atomic.AtomicLong
-import org.fusesource.hawtdispatch.Dispatch
 import protocol.{ProtocolFactory, ProtocolHandler}
+import org.apache.activemq.apollo.transport.{DefaultTransportListener, Transport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -42,7 +36,6 @@ abstract class Connection() extends Defa
 
   override protected def log = Connection
 
-  import Connection._
   val dispatchQueue = createQueue()
   var stopped = true
   var transport:Transport = null
@@ -86,14 +79,12 @@ abstract class Connection() extends Defa
  */
 class BrokerConnection(val connector: Connector, val id:Long) extends Connection {
 
-  var protocol = "multi"
   var protocolHandler: ProtocolHandler = null;
 
   override def toString = "id: "+id.toString
 
   override protected  def _start(onCompleted:Runnable) = {
     connector.dispatchQueue.retain
-    protocolHandler = ProtocolFactory.get(protocol).createProtocolHandler
     protocolHandler.setConnection(this);
     super._start(onCompleted)
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul  7 04:23:35 2010
@@ -16,15 +16,14 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.org.apache.activemq.transport._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import org.fusesource.hawtdispatch.{Dispatch}
 import org.apache.activemq.apollo.dto.{ConnectorDTO}
-import org.apache.activemq.wireformat.WireFormatFactory
+import protocol.{ProtocolFactory, Protocol}
 import ReporterLevel._
 import org.apache.activemq.apollo.util.LongCounter
 import collection.mutable.HashMap
-
+import org.apache.activemq.apollo.transport._
 /**
  * <p>
  * </p>
@@ -76,7 +75,7 @@ class Connector(val broker:Broker, val i
 
   var config:ConnectorDTO = defaultConfig
   var transportServer:TransportServer = _
-  var wireFormatFactory:WireFormatFactory = _
+  var protocol:Protocol = _
 
   val connections = HashMap[Long, BrokerConnection]()
   override def toString = "connector: "+config.id
@@ -92,13 +91,13 @@ class Connector(val broker:Broker, val i
     def onAccept(transport: Transport): Unit = {
       debug("Accepted connection from: %s", transport.getRemoteAddress)
 
-      if( wireFormatFactory!=null ) {
-        transport.setWireformat(wireFormatFactory.createWireFormat)
+      if( protocol!=null ) {
+        transport.setProtocolCodec(protocol.createProtocolCodec)
       }
 
       accept_counter.incrementAndGet
       var connection = new BrokerConnection(Connector.this, broker.connection_id_counter.incrementAndGet)
-      connection.protocol = config.protocol
+      connection.protocolHandler = protocol.createProtocolHandler
       connection.transport = transport
 
       if( STICK_ON_THREAD_QUEUES ) {
@@ -138,8 +137,7 @@ class Connector(val broker:Broker, val i
 
   override def _start(onCompleted:Runnable) = {
     assert(config!=null, "Connector must be configured before it is started.")
-    wireFormatFactory = TransportFactorySupport.createWireFormatFactory(config.protocol)
-
+    protocol = ProtocolFactory.get(config.protocol).get
     transportServer = TransportFactory.bind( config.bind )
     transportServer.setDispatchQueue(dispatchQueue)
     transportServer.setAcceptListener(BrokerAcceptListener)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:23:35 2010
@@ -22,7 +22,7 @@ import _root_.org.fusesource.hawtdispatc
 import org.fusesource.hawtbuf._
 import org.apache.activemq.broker.store.StoreUOW
 import org.apache.activemq.apollo.store.MessageRecord
-import protocol.ProtocolFactory
+import protocol.{Protocol, ProtocolFactory}
 
 /**
  * A producer which sends Delivery objects to a delivery consumer.
@@ -108,9 +108,9 @@ trait Message extends Filterable with Re
   def destination: Destination
 
   /**
-   * The protocol encoding of the message.
+   * The protocol of the message
    */
-  def protocol:AsciiBuffer
+  def protocol:Protocol
 
 }
 
@@ -167,11 +167,9 @@ class Delivery extends BaseRetained {
   }
 
   def createMessageRecord() = {
-    val sm = new MessageRecord
-    sm.protocol = message.protocol
-    sm.buffer = ProtocolFactory.get(message.protocol).encode(message)
-    sm.size = size
-    sm
+    val record = message.protocol.encode(message)
+    assert( record.size == size )
+    record
   }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:23:35 2010
@@ -263,7 +263,6 @@ class Queue(val host: VirtualHost, val d
   }
 
 
-  var check_counter = 0
   def display_stats: Unit = {
     info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, capacity_used, capacity)
     info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
@@ -304,16 +303,6 @@ class Queue(val host: VirtualHost, val d
     def slowConsumerCheck = {
       if( serviceState.isStarted ) {
 
-        // Handy for periodically looking at the dispatch state...
-        check_counter += 1
-
-        if( (check_counter%25)==0 ) {
-          display_stats
-//          if (!all_subscriptions.isEmpty) {
-//            display_active_entries
-//          }
-        }
-
         // target tune_min_subscription_rate / sec
         val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
         var idleConsumerCount = 0
@@ -1167,7 +1156,7 @@ class QueueEntry(val queue:Queue, val se
         queue.loading_size -= size
 
         val delivery = new Delivery()
-        delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.buffer)
+        delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Jul  7 04:23:35 2010
@@ -18,8 +18,8 @@ package org.apache.activemq.apollo.broke
 
 import _root_.org.fusesource.hawtdispatch._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.transport.Transport
 import java.util.{LinkedList}
+import org.apache.activemq.apollo.transport.Transport
 
 /**
  * <p>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:23:35 2010
@@ -24,9 +24,9 @@ import path.PathFilter
 import org.fusesource.hawtbuf.AsciiBuffer
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
+import protocol.Protocol
 import ReporterLevel._
 import org.apache.activemq.broker.store.{Store}
-import org.fusesource.hawtbuf.proto.WireFormat
 import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
 import java.util.concurrent.TimeUnit
@@ -100,9 +100,8 @@ class VirtualHost(val broker: Broker, va
   }
 
   var store:Store = null
-  var memory_pool:DirectBufferPool = null
+  var direct_buffer_pool:DirectBufferPool = null
   var transactionManager:TransactionManagerX = new TransactionManagerX
-  var protocols = Map[AsciiBuffer, WireFormat]()
   val queue_id_counter = new LongCounter
 
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
@@ -137,8 +136,8 @@ class VirtualHost(val broker: Broker, va
     }
 
     if( direct_buffer_pool_config!=null ) {
-      memory_pool = DirectBufferPoolFactory.create(direct_buffer_pool_config)
-      memory_pool.start
+      direct_buffer_pool = DirectBufferPoolFactory.create(direct_buffer_pool_config)
+      direct_buffer_pool.start
     }
 
     if( store!=null ) {
@@ -220,9 +219,9 @@ class VirtualHost(val broker: Broker, va
 //        }
 //        done.await();
 
-    if( memory_pool!=null ) {
-      memory_pool.stop
-      memory_pool = null
+    if( direct_buffer_pool!=null ) {
+      direct_buffer_pool.stop
+      direct_buffer_pool = null
     }
 
     if( store!=null ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala Wed Jul  7 04:23:35 2010
@@ -18,7 +18,12 @@ package org.apache.activemq.apollo.broke
 
 import org.apache.activemq.apollo.broker.{Message, ProtocolException}
 import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
-import org.apache.activemq.wireformat.{MultiWireFormatFactory, WireFormat}
+import org.apache.activemq.apollo.store.MessageRecord
+import org.apache.activemq.apollo.transport.{ProtocolCodec}
+import java.nio.channels.{WritableByteChannel, ReadableByteChannel}
+import java.nio.ByteBuffer
+import java.io.IOException
+import java.lang.String
 
 /**
  * <p>
@@ -26,49 +31,132 @@ import org.apache.activemq.wireformat.{M
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class MultiProtocol extends Protocol {
+class MultiProtocolFactorySPI extends ProtocolFactory.SPI {
 
-  val wff = new MultiWireFormatFactory
+  def all_protocols: Array[Protocol] = ((ProtocolFactory.spis.map(_.create())).filter(_.isIdentifiable)).toArray
 
-  def name = new AsciiBuffer("multi")
+  def create() = {
+    new MultiProtocol(()=>all_protocols)
+  }
+
+  def create(config: String): Protocol = {
+    val MULTI = "multi"
+    val MULTI_PREFIXED = "multi:"
+
+    if (config == MULTI) {
+      return new MultiProtocol(()=>all_protocols)
+    } else if (config.startsWith(MULTI_PREFIXED)) {
+      var names: Array[String] = config.substring(MULTI_PREFIXED.length).split(',')
+      var protocols: Array[Protocol] = (names.flatMap {x => ProtocolFactory.get(x.trim)}).toArray
+      return new MultiProtocol(()=>protocols)
+    }
+    return null
+  }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MultiProtocol(val func: ()=>Array[Protocol]) extends Protocol {
+
+  lazy val protocols: Array[Protocol] = func()
+
+  def createProtocolCodec = new MultiProtocolCodec(protocols)
 
-  def createWireFormat = wff.createWireFormat
   def createProtocolHandler = new MultiProtocolHandler
-  
+
   def encode(message: Message) = throw new UnsupportedOperationException
-  def decode(message: Buffer) = throw new UnsupportedOperationException
+
+  def decode(message: MessageRecord) = throw new UnsupportedOperationException
+
+  def isIdentifiable = false
+
+  def maxIdentificaionLength = throw new UnsupportedOperationException()
+
+  def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
+
 }
 
+class MultiProtocolCodec(val protocols: Array[Protocol]) extends ProtocolCodec {
+
+  if (protocols.isEmpty) {
+    throw new IllegalArgumentException("No protocol configured for identification.")
+  }
+  val buffer = ByteBuffer.allocate(protocols.foldLeft(0) {(a, b) => a.max(b.maxIdentificaionLength)})
+  var channel: ReadableByteChannel = null
+
+  def setReadableByteChannel(channel: ReadableByteChannel) = {this.channel = channel}
+
+  def read: AnyRef = {
+    if (channel == null) {
+      throw new IllegalStateException
+    }
+
+    channel.read(buffer)
+    val buff = new Buffer(buffer.array(), 0, buffer.position())
+    protocols.foreach {protocol =>
+      if (protocol.matchesIdentification(buff)) {
+        val protocolCodec = protocol.createProtocolCodec()
+        protocolCodec.unread(buff)
+        return protocolCodec
+      }
+    }
+    if (buffer.position() == buffer.capacity) {
+      channel = null
+      throw new IOException("Could not identify the protocol.")
+    }
+    return null
+  }
+
+  def getReadCounter = buffer.position()
+
+  def unread(buffer: Buffer) = throw new UnsupportedOperationException()
+
+  def setWritableByteChannel(channel: WritableByteChannel) = {}
+
+  def write(value: Any) = throw new UnsupportedOperationException()
+
+  def flush = throw new UnsupportedOperationException()
+
+  def getWriteCounter = 0L
+
+  def protocol = "multi"
+
+}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class MultiProtocolHandler extends ProtocolHandler {
 
+  def protocol = "multi"
+
   var connected = false
 
-  override def onTransportCommand(command:Any) = {
+  override def onTransportCommand(command: Any) = {
 
-    if (!command.isInstanceOf[WireFormat]) {
-      throw new ProtocolException("Expected WireFormat");
+    if (!command.isInstanceOf[ProtocolCodec]) {
+      throw new ProtocolException("Expected a protocol codec");
     }
 
-    var wireformat:WireFormat = command.asInstanceOf[WireFormat];
-    val protocol = wireformat.getName()
-    val protocolHandler = try {
-      // Create the new protocol handler..
-       ProtocolFactory.get(protocol).createProtocolHandler
-    } catch {
-      case e:Exception=>
-      throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
+    var codec: ProtocolCodec = command.asInstanceOf[ProtocolCodec];
+    val protocol = codec.protocol()
+    val protocolHandler = ProtocolFactory.get(protocol) match {
+      case Some(x) => x.createProtocolHandler
+      case None =>
+        throw new ProtocolException("No protocol handler available for protocol: " + protocol);
     }
+
     protocolHandler.setConnection(connection);
 
     // replace the current handler with the new one.
-    connection.protocol = protocol
     connection.protocolHandler = protocolHandler
-    connection.transport.setWireformat(wireformat)
-    
+    connection.transport.setProtocolCodec(codec)
+
     connection.transport.suspendRead
     protocolHandler.onTransportConnected
   }
@@ -78,3 +166,4 @@ class MultiProtocolHandler extends Proto
   }
 
 }
+

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Wed Jul  7 04:23:35 2010
@@ -16,13 +16,12 @@
  */
 package org.apache.activemq.apollo.broker.protocol
 
-import org.apache.activemq.transport.DefaultTransportListener
 import java.io.{IOException}
 import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import org.apache.activemq.wireformat.WireFormat
 import org.apache.activemq.apollo.util.ClassFinder
-
+import org.apache.activemq.apollo.store.MessageRecord
+import org.apache.activemq.apollo.transport._
 
 /**
  * <p>
@@ -32,42 +31,38 @@ import org.apache.activemq.apollo.util.C
  */
 object ProtocolFactory {
 
-  val finder =  ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocols")
-  var protocols = Map[AsciiBuffer, Protocol]()
+  trait SPI {
+    def create():Protocol
+    def create(config:String):Protocol
+  }
+
+  val finder =  ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/protocols")
+  var spis = List[SPI]()
 
   finder.find.foreach{ clazz =>
     try {
-
-      val protocol = clazz.newInstance.asInstanceOf[Protocol]
-      protocols += protocol.name -> protocol
-
+      spis ::= clazz.newInstance.asInstanceOf[SPI]
     } catch {
-      case e:Throwable =>
-        e.printStackTrace
+      case e:Throwable => e.printStackTrace
     }
   }
 
-  def get(name:String):Protocol = get(new AsciiBuffer(name))
-
-  def get(name:AsciiBuffer):Protocol = {
-    protocols.get(name) match {
-      case None =>
-        throw new IllegalArgumentException("Protocol not found: "+name)
-      case Some(x) => x
+  def get(name:String):Option[Protocol] = {
+    spis.foreach { spi=>
+      val rc = spi.create(name)
+      if( rc!=null ) {
+        return Some(rc)
+      }
     }
+    None
   }
-
 }
 
-trait Protocol {
-
-  def name:AsciiBuffer
+trait Protocol extends ProtocolCodecFactory {
 
-  def createWireFormat:WireFormat
   def createProtocolHandler:ProtocolHandler
-
-  def encode(message:Message):Buffer
-  def decode(message:Buffer):Message
+  def encode(message:Message):MessageRecord
+  def decode(message:MessageRecord):Message
 
 }
 
@@ -77,6 +72,8 @@ trait Protocol {
  */
 trait ProtocolHandler extends DefaultTransportListener {
 
+  def protocol:String
+
   var connection:BrokerConnection = null;
 
   def setConnection(brokerConnection:BrokerConnection) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul  7 04:23:35 2010
@@ -22,19 +22,14 @@ import _root_.java.util.concurrent.atomi
 import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import _root_.org.apache.activemq.apollo.broker._
-import _root_.org.apache.activemq.transport.Transport
-import _root_.org.apache.activemq.transport.TransportFactory
-import _root_.org.apache.activemq.transport.TransportServer
-import _root_.org.apache.activemq.transport.pipe.PipeTransport
-import _root_.org.apache.activemq.transport.pipe.PipeTransportFactory
-import _root_.org.apache.activemq.transport.pipe.PipeTransportServer
 import _root_.org.apache.activemq.util.IOExceptionSupport
 import _root_.org.apache.activemq.util.URISupport
-import _root_.org.apache.activemq.transport.TransportFactorySupport.configure
-import _root_.org.apache.activemq.transport.TransportFactorySupport.verify
 
 import _root_.scala.collection.JavaConversions._
-import org.apache.activemq.apollo.dto.ConnectorDTO
+import org.apache.activemq.apollo.transport._
+import org.apache.activemq.apollo.transport.pipe.PipeTransportFactory
+import org.apache.activemq.apollo.transport.pipe.PipeTransport
+import org.apache.activemq.apollo.transport.pipe.PipeTransportServer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -155,6 +150,7 @@ class VMTransportFactory extends PipeTra
       }
 
       var transport = server.connect()
+      import TransportFactorySupport._
       verify(configure(transport, options), options)
 
     } catch {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml Wed Jul  7 04:23:35 2010
@@ -18,7 +18,7 @@
 <broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
   <dispatcher name="test dispatcher" threads="4"/>
 
-  <transport-server>pipe://test1?wireFormat=mock</transport-server>
+  <transport-server>pipe://test1</transport-server>
 
   <connector/>
   <connector>pipe://test1</connector>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Jul  7 04:23:35 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke
 
 import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import _root_.java.lang.{String}
-import org.apache.activemq.transport.TransportFactory
 
 import java.util.ArrayList
 import org.apache.activemq.apollo.broker._
@@ -32,6 +31,7 @@ import java.net.URL
 import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import org.apache.activemq.apollo.dto.BrokerDTO
+import org.apache.activemq.apollo.transport.TransportFactory
 
 /**
  * 
@@ -101,8 +101,8 @@ abstract class BrokerPerfSupport extends
       sendBrokerBindURI = "tcp://localhost:10000"
       receiveBrokerBindURI = "tcp://localhost:20000"
 
-      sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat()
-      receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat()
+      sendBrokerConnectURI = "tcp://localhost:10000?protocol=" + getRemoteProtocolName()
+      receiveBrokerConnectURI = "tcp://localhost:20000?protocol=" + getRemoteProtocolName()
     } else {
       sendBrokerConnectURI = "pipe://SendBroker"
       receiveBrokerConnectURI = "pipe://ReceiveBroker"
@@ -188,8 +188,8 @@ abstract class BrokerPerfSupport extends
   protected def createConsumer(): RemoteConsumer
   protected def createProducer(): RemoteProducer
 
-  def getBrokerWireFormat() = "multi"
-  def getRemoteWireFormat(): String
+  def getBrokerProtocolName() = "multi"
+  def getRemoteProtocolName(): String
 
   def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
 
@@ -197,7 +197,7 @@ abstract class BrokerPerfSupport extends
     val connector = config.connectors.get(0)
     connector.bind = bindURI
     connector.advertise = connectUri
-    connector.protocol = getBrokerWireFormat
+    connector.protocol = getBrokerProtocolName
 
     val host = config.virtual_hosts.get(0)
     host.purge_on_startup = true

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul  7 04:23:35 2010
@@ -18,8 +18,8 @@ package org.apache.activemq.apollo.trans
 
 import java.io.IOException;
 
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportFactory;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class VMTransportTest {
 	
 	@Test()
 	public void autoCreateBroker() throws Exception {
-		Transport connect = TransportFactory.connect("vm://test1?wireFormat=mock");
+		Transport connect = TransportFactory.connect("vm://test1?protocol=null");
         connect.setDispatchQueue(Dispatch.createQueue());
 		connect.start();
 		assertNotNull(connect);
@@ -45,12 +45,12 @@ public class VMTransportTest {
 	
 	@Test(expected=IOException.class)
 	public void noAutoCreateBroker() throws Exception {
-		TransportFactory.connect("vm://test2?create=false&wireFormat=mock");
+		TransportFactory.connect("vm://test2?create=false&protocol=null");
 	}
 	
 	@Test(expected=IllegalArgumentException.class)
 	public void badOptions() throws Exception {
-		TransportFactory.connect("vm://test3?crazy-option=false&wireFormat=mock");
+		TransportFactory.connect("vm://test3?crazy-option=false&protocol=null");
 	}
 	
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools Wed Jul  7 04:23:35 2010
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.broker.store.hawtdb.HawtDBMemoryPoolSPI
\ No newline at end of file
+org.apache.activemq.broker.store.hawtdb.HawtDBDirectBufferPoolSPI
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols Wed Jul  7 04:23:35 2010
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocolFactorySPI
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 04:23:35 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import org.apache.activemq.transport.TransportFactory
 import org.apache.activemq.apollo.broker.{LoggingTracker, Broker}
 import java.io.File
 import org.apache.activemq.apollo.dto.{CassandraStoreDTO, HawtDBStoreDTO}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (from r961217, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala&r1=961217&r2=961218&rev=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Wed Jul  7 04:23:35 2010
@@ -18,11 +18,8 @@ package org.apache.activemq.apollo.stomp
 
 import _root_.org.apache.activemq.apollo.broker._
 
-import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
 import java.nio.ByteBuffer
-import _root_.org.fusesource.hawtbuf._
 import collection.mutable.{ListBuffer, HashMap}
-import AsciiBuffer._
 import Stomp._
 import Stomp.Headers._
 
@@ -32,30 +29,13 @@ import StompFrameConstants._
 import java.io.{EOFException, DataOutput, DataInput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import org.apache.activemq.apollo.{DirectBuffer, DirectBufferPool}
+import org.apache.activemq.apollo.transport._
+import org.apache.activemq.apollo.store.MessageRecord
+import _root_.org.fusesource.hawtbuf._
+import Buffer._
 
-/**
- * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
- */
-class StompWireFormatFactory extends WireFormatFactory {
-  import Stomp.Commands.CONNECT
-
-    def createWireFormat() = new StompWireFormat();
-
-    def isDiscriminatable() = true
-
-    def maxWireformatHeaderLength() = CONNECT.length+10;
 
-    def matchesWireformatHeader(header:Buffer) = {
-        if( header.length < CONNECT.length) {
-          false
-        } else {
-          // the magic can be preceded with newlines / whitespace..
-          header.trimFront.startsWith(CONNECT);
-        }
-    }
-}
-
-object StompWireFormat extends Log {
+object StompCodec extends Log {
     val READ_BUFFFER_SIZE = 1024*64;
     val MAX_COMMAND_LENGTH = 1024;
     val MAX_HEADER_LENGTH = 1024 * 10;
@@ -63,45 +43,23 @@ object StompWireFormat extends Log {
     val MAX_DATA_LENGTH = 1024 * 1024 * 100;
     val TRIM=true
     val SIZE_CHECK=false
-  }
 
-class StompWireFormat extends WireFormat with DispatchLogging {
 
-  import StompWireFormat._
-  override protected def log: Log = StompWireFormat
-
-  var memory_pool:DirectBufferPool = null
+  def encode(message: StompFrameMessage):MessageRecord = {
+    val frame = message.frame
 
-  implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
-  implicit def wrap(x: Byte) = {
-    ByteBuffer.wrap(Array(x));
-  }
+    val rc = new MessageRecord
+    rc.protocol = StompConstants.PROTOCOL
+    rc.size = frame.size
+    rc.expiration = message.expiration
 
+    if( frame.content.isInstanceOf[DirectContent] ) {
+      rc.direct_buffer = frame.content.asInstanceOf[DirectContent].direct_buffer
+    }
 
-  def marshal(command:Any, os:DataOutput) = {
-    marshal(command.asInstanceOf[StompFrame], os)
-  }
-
-  def marshal(command:Any):Buffer= {
-    val frame = command.asInstanceOf[StompFrame]
-    val os = new DataByteArrayOutputStream(frame.size);
-    marshal(frame, os)
-    os.toBuffer
-  }
-
-  def unmarshal(packet:Buffer):AnyRef = {
-    read_start = packet.offset
-    read_end = packet.offset
-    val bb = packet.toByteBuffer
-    bb.position(packet.offset + packet.length)
-    unmarshalNB(bb)
-  }
-
-  def unmarshal(in: DataInput):AnyRef = {
-    throw new UnsupportedOperationException
-  }
+    def buffer_size = if (rc.direct_buffer!=null) { frame.size - (rc.direct_buffer.size - 1) } else { frame.size }
+    val os = new ByteArrayOutputStream(buffer_size)
 
-  def marshal(frame:StompFrame, os:DataOutput) = {
     frame.action.writeTo(os)
     os.write(NEWLINE)
 
@@ -120,9 +78,9 @@ class StompWireFormat extends WireFormat
 
       val offset = frame.headers.head._1.offset;
       val buffer1 = frame.headers.head._1;
-      val buffer2 = frame.content.asInstanceOf[BufferStompContent].content;
+      val buffer2 = frame.content.asInstanceOf[BufferContent].content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
-      os.write( buffer1.data, offset, length)                                                                                            
+      os.write( buffer1.data, offset, length)
 
     } else {
       for( (key, value) <- frame.headers ) {
@@ -132,12 +90,84 @@ class StompWireFormat extends WireFormat
         os.write(NEWLINE)
       }
       os.write(NEWLINE)
-      frame.content.writeTo(os)
+      if ( rc.direct_buffer==null ) {
+        frame.content.writeTo(os)
+      }
+    }
+    rc.buffer = os.toBuffer
+    rc
+  }
+
+  def decode(message: MessageRecord):StompFrameMessage = {
+
+    val buffer = message.buffer.buffer
+    def read_line = {
+      val pos = buffer.indexOf('\n'.toByte)
+      if( pos<0 ) {
+        throw new IOException("expected a new line")
+      } else {
+        val rc = buffer.slice(0, pos).ascii
+        buffer.offset += (pos+1)
+        buffer.length -= (pos+1)
+        rc
+      }
+    }
+
+
+    val action = if( TRIM ) {
+        read_line.trim()
+      } else {
+        read_line
+      }
+
+    val headers = new HeaderMapBuffer()
+
+    var line = read_line
+    while( line.length() > 0 ) {
+      try {
+          val seperatorIndex = line.indexOf(SEPERATOR)
+          if( seperatorIndex<0 ) {
+              throw new IOException("Header line missing seperator.")
+          }
+          var name = line.slice(0, seperatorIndex)
+          if( TRIM ) {
+              name = name.trim()
+          }
+          var value = line.slice(seperatorIndex + 1, line.length())
+          if( TRIM ) {
+              value = value.trim()
+          }
+          headers.add((name, value))
+      } catch {
+          case e:Exception=>
+            e.printStackTrace
+            throw new IOException("Unable to parser header line [" + line + "]")
+      }
+      line = read_line
+    }
+
+    if( message.direct_buffer==null ) {
+      new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
+    } else {
+      new StompFrameMessage(new StompFrame(action, headers.toList, DirectContent(message.direct_buffer)))
     }
-    END_OF_FRAME_BUFFER.writeTo(os)
   }
 
-  def getName() = "stomp"
+}
+
+class StompCodec extends ProtocolCodec with DispatchLogging {
+
+  import StompCodec._
+  override protected def log: Log = StompCodec
+
+  var memory_pool:DirectBufferPool = null
+
+  implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
+  implicit def wrap(x: Byte) = {
+    ByteBuffer.wrap(Array(x));
+  }
+
+  def protocol() = "stomp"
 
   
   /////////////////////////////////////////////////////////////////////
@@ -171,21 +201,21 @@ class StompWireFormat extends WireFormat
   def getWriteCounter = write_counter
 
 
-  def write(command: Any):WireFormat.BufferState =  {
+  def write(command: Any):ProtocolCodec.BufferState =  {
     if ( is_full) {
-      WireFormat.BufferState.FULL
+      ProtocolCodec.BufferState.FULL
     } else {
       val was_empty = is_empty
-      marshalX(command.asInstanceOf[StompFrame], next_write_buffer);
+      encode(command.asInstanceOf[StompFrame], next_write_buffer);
       if( was_empty ) {
-        WireFormat.BufferState.WAS_EMPTY
+        ProtocolCodec.BufferState.WAS_EMPTY
       } else {
-        WireFormat.BufferState.NOT_EMPTY
+        ProtocolCodec.BufferState.NOT_EMPTY
       }
     }
   }
 
-  def marshalX(frame:StompFrame, os:DataOutput) = {
+  def encode(frame:StompFrame, os:DataOutput) = {
     frame.action.writeTo(os)
     os.write(NEWLINE)
 
@@ -204,7 +234,7 @@ class StompWireFormat extends WireFormat
 
       val offset = frame.headers.head._1.offset;
       val buffer1 = frame.headers.head._1;
-      val buffer2 = frame.content.asInstanceOf[BufferStompContent].content;
+      val buffer2 = frame.content.asInstanceOf[BufferContent].content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
       os.write( buffer1.data, offset, length)
       END_OF_FRAME_BUFFER.writeTo(os)
@@ -219,11 +249,11 @@ class StompWireFormat extends WireFormat
       os.write(NEWLINE)
 
       frame.content match {
-        case x:DirectStompContent=>
-          next_write_direct = x.direct.buffer.duplicate
+        case x:DirectContent=>
+          next_write_direct = x.direct_buffer.buffer.duplicate
           next_write_direct.clear
           next_write_direct_frame = frame
-        case x:BufferStompContent=>
+        case x:BufferContent=>
           x.content.writeTo(os)
           END_OF_FRAME_BUFFER.writeTo(os)
         case _=>
@@ -233,7 +263,7 @@ class StompWireFormat extends WireFormat
   }
 
 
-  def flush():WireFormat.BufferState = {
+  def flush():ProtocolCodec.BufferState = {
 
     // if we have a pending write that is being sent over the socket...
     if ( write_buffer.remaining() != 0 ) {
@@ -262,9 +292,9 @@ class StompWireFormat extends WireFormat
     }
 
     if ( is_empty ) {
-      WireFormat.BufferState.EMPTY
+      ProtocolCodec.BufferState.EMPTY
     } else {
-      WireFormat.BufferState.NOT_EMPTY
+      ProtocolCodec.BufferState.NOT_EMPTY
     }
 
   }
@@ -357,18 +387,6 @@ class StompWireFormat extends WireFormat
     return command
   }
 
-  def unmarshalNB(buffer:ByteBuffer):Object = {
-    // keep running the next action until
-    // a frame is decoded or we run out of input
-    var rc:StompFrame = null
-    while( rc == null && read_end!=buffer.position ) {
-      rc = next_action(buffer)
-    }
-
-//      trace("unmarshalled: "+rc+", start: "+start+", end: "+end+", buffer position: "+buffer.position)
-    rc
-  }
-
   def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
       val read_limit = buffer.position
       while( read_end < read_limit ) {
@@ -480,7 +498,7 @@ class StompWireFormat extends WireFormat
               read_end = read_start
 
               next_action = read_action
-              rc = new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
+              rc = new StompFrame(ascii(action), headers.toList, DirectContent(ma))
             }
 
           } else {
@@ -510,7 +528,7 @@ class StompWireFormat extends WireFormat
   def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:DirectBuffer):FrameReader = (buffer)=> {
     if( read_content_direct(ma) ) {
       next_action = read_action
-      new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
+      new StompFrame(ascii(action), headers.toList, DirectContent(ma))
     } else {
       null
     }
@@ -540,7 +558,7 @@ class StompWireFormat extends WireFormat
     val content:Buffer=read_content(buffer, contentLength)
     if( content != null ) {
       next_action = read_action
-      new StompFrame(ascii(action), headers.toList, BufferStompContent(content))
+      new StompFrame(ascii(action), headers.toList, BufferContent(content))
     } else {
       null
     }
@@ -581,7 +599,7 @@ class StompWireFormat extends WireFormat
     val content:Buffer=read_to_null(buffer)
     if( content != null ) {
       next_action = read_action
-      new StompFrame(ascii(action), headers.toList, BufferStompContent(content))
+      new StompFrame(ascii(action), headers.toList, BufferContent(content))
     } else {
       null
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 04:23:35 2010
@@ -21,10 +21,10 @@ import _root_.org.apache.activemq.filter
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.ListBuffer
 import java.lang.{String, Class}
-import java.io.DataOutput
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.DirectBuffer
 import org.fusesource.hawtdispatch.BaseRetained
+import java.io.{OutputStream, DataOutput}
 
 /**
  *
@@ -42,9 +42,12 @@ import StompConstants._;
 import BufferConversions._
 import Buffer._
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 case class StompFrameMessage(frame:StompFrame) extends Message {
   
-  def protocol = PROTOCOL
+  def protocol = StompProtocol
 
   /**
    * the globally unique id of the message
@@ -95,7 +98,7 @@ case class StompFrameMessage(frame:Stomp
 
   def getBodyAs[T](toType : Class[T]) = {
     (frame.content match {
-      case x:BufferStompContent =>
+      case x:BufferContent =>
         if( toType == classOf[String] ) {
           x.content.utf8
         } else if (toType == classOf[Buffer]) {
@@ -107,9 +110,9 @@ case class StompFrameMessage(frame:Stomp
         } else {
           null
         }
-      case x:DirectStompContent =>
+      case x:DirectContent =>
         null
-      case NilStompContent =>
+      case NilContent =>
         if( toType == classOf[String] ) {
           ""
         } else if (toType == classOf[Buffer]) {
@@ -163,16 +166,22 @@ case class StompFrameMessage(frame:Stomp
 
 
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object StompFrame extends Sizer[StompFrame] {
   def size(value:StompFrame) = value.size
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait StompContent {
   def length:Int
 
   def isEmpty = length == 0
 
-  def writeTo(os:DataOutput)
+  def writeTo(os:OutputStream)
 
   def utf8:UTF8Buffer
 
@@ -180,25 +189,34 @@ trait StompContent {
   def release = {}
 }
 
-object NilStompContent extends StompContent {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object NilContent extends StompContent {
   def length = 0
-  def writeTo(os:DataOutput) = {}
+  def writeTo(os:OutputStream) = {}
   val utf8 = new UTF8Buffer("")
 }
 
-case class BufferStompContent(content:Buffer) extends StompContent {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class BufferContent(content:Buffer) extends StompContent {
   def length = content.length
-  def writeTo(os:DataOutput) = content.writeTo(os)
+  def writeTo(os:OutputStream) = content.writeTo(os)
   def utf8:UTF8Buffer = content.utf8
 }
 
-case class DirectStompContent(direct:DirectBuffer) extends StompContent {
-  def length = direct.size-1
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class DirectContent(direct_buffer:DirectBuffer) extends StompContent {
+  def length = direct_buffer.size-1
 
-  def writeTo(os:DataOutput) = {
+  def writeTo(os:OutputStream) = {
     val buff = new Array[Byte](1024*4)
-    val source = direct.buffer.duplicate
-    var remaining = direct.size-1
+    val source = direct_buffer.buffer.duplicate
+    var remaining = direct_buffer.size-1
     while( remaining> 0 ) {
       val c = remaining.min(buff.length)
       source.get(buff, 0, c)
@@ -208,7 +226,7 @@ case class DirectStompContent(direct:Dir
   }
 
   def buffer:Buffer = {
-    val rc = new DataByteArrayOutputStream(direct.size-1)
+    val rc = new DataByteArrayOutputStream(direct_buffer.size-1)
     writeTo(rc)
     rc.toBuffer
   }
@@ -217,8 +235,8 @@ case class DirectStompContent(direct:Dir
     buffer.utf8
   }
 
-  override def retain = direct.retain
-  override def release = direct.release
+  override def retain = direct_buffer.retain
+  override def release = direct_buffer.release
 }
 
 /**
@@ -226,7 +244,7 @@ case class DirectStompContent(direct:Dir
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilStompContent, updated_headers:HeaderMap=Nil) {
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilContent, updated_headers:HeaderMap=Nil) {
 
   def size_of_updated_headers = {
     size_of(updated_headers)
@@ -260,12 +278,12 @@ case class StompFrame(action:AsciiBuffer
   }
 
   def are_headers_in_content_buffer = !headers.isEmpty && 
-          content.isInstanceOf[BufferStompContent] &&
-          ( headers.head._1.data eq content.asInstanceOf[BufferStompContent].content.data )
+          content.isInstanceOf[BufferContent] &&
+          ( headers.head._1.data eq content.asInstanceOf[BufferContent].content.data )
 
   def size:Int = {
      content match {
-       case x:BufferStompContent =>
+       case x:BufferContent =>
          if( (action.data eq x.content.data) && updated_headers==Nil ) {
             return (x.content.offset-action.offset)+x.content.length
          }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:23:35 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import _root_.org.apache.activemq.wireformat.{WireFormat}
 import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.{ListBuffer, HashMap}
@@ -24,7 +23,8 @@ import _root_.org.fusesource.hawtdispatc
 
 import AsciiBuffer._
 import org.apache.activemq.apollo.broker._
-import protocol.{Protocol, ProtocolHandler}
+import protocol.{ProtocolFactory, Protocol, ProtocolHandler}
+import java.lang.String
 import Stomp._
 import BufferConversions._
 import StompFrameConstants._
@@ -32,10 +32,15 @@ import java.io.IOException
 import org.apache.activemq.selector.SelectorParser
 import org.apache.activemq.filter.{BooleanExpression, FilterException}
 import org.apache.activemq.broker.store.{StoreUOW}
+import org.apache.activemq.apollo.transport._
+import org.apache.activemq.apollo.store.MessageRecord
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object StompConstants {
 
-  val PROTOCOL = new AsciiBuffer("stomp");
+  val PROTOCOL = "stomp"
 
   val options = new ParserOptions
   options.queuePrefix = new AsciiBuffer("/queue/")
@@ -51,26 +56,56 @@ object StompConstants {
   }
 
 }
+/**
+ * Creates StompCodec objects that encode/decode the
+ * <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompProtocolCodecFactory extends ProtocolCodecFactory {
+  import Stomp.Commands.CONNECT
+  import Stomp.Commands.STOMP
 
-class StompProtocol extends Protocol {
-  import StompConstants._
-  
-  val wff = new StompWireFormatFactory
+  def createProtocolCodec() = new StompCodec();
+
+  def isIdentifiable() = true
+
+  def maxIdentificaionLength() = CONNECT.length;
+
+  def matchesIdentification(header: Buffer):Boolean = {
+    if (header.length < CONNECT.length) {
+      false
+    } else {
+      header.startsWith(CONNECT) || header.startsWith(STOMP)
+    }
+  }
+}
+
+class StompProtocolFactorySPI extends ProtocolFactory.SPI {
 
-  def name = PROTOCOL
+  def create() = StompProtocol
 
-  def createWireFormat = wff.createWireFormat
+  def create(config: String) = if(config == "stomp") {
+    StompProtocol
+  } else {
+    null
+  }
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StompProtocol extends StompProtocolCodecFactory with Protocol {
 
   def createProtocolHandler = new StompProtocolHandler
 
-  def encode(message: Message) = {
-    val sfm = message.asInstanceOf[StompFrameMessage]
-    createWireFormat.marshal(sfm.frame)
+  def encode(message: Message):MessageRecord = {
+    StompCodec.encode(message.asInstanceOf[StompFrameMessage])
   }
 
-  def decode(message: Buffer) = {
-    val frame = createWireFormat.unmarshal(message).asInstanceOf[StompFrame]
-    StompFrameMessage(frame)
+  def decode(message: MessageRecord) = {
+    StompCodec.decode(message)
   }
 
 }
@@ -79,7 +114,12 @@ import StompConstants._
 
 object StompProtocolHandler extends Log
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class StompProtocolHandler extends ProtocolHandler with DispatchLogging {
+  
+  def protocol = "stomp"
 
   override protected def log = StompProtocolHandler
   
@@ -95,7 +135,7 @@ class StompProtocolHandler extends Proto
     })
 
     def matches(delivery:Delivery) = {
-      if( delivery.message.protocol eq PROTOCOL ) {
+      if( delivery.message.protocol eq StompProtocol ) {
         if( selector!=null ) {
           selector._2.matches(delivery.message)
         } else {
@@ -172,9 +212,9 @@ class StompProtocolHandler extends Proto
     connection.connector.broker.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
-        if( this.host.memory_pool!=null ) {
-          val wf = connection.transport.getWireformat.asInstanceOf[StompWireFormat]
-          wf.memory_pool = this.host.memory_pool
+        if( this.host.direct_buffer_pool!=null ) {
+          val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+          wf.memory_pool = this.host.direct_buffer_pool
         }
         connection.transport.resumeRead
       }
@@ -214,7 +254,7 @@ class StompProtocolHandler extends Proto
         case StompFrame(Commands.DISCONNECT, headers, content, _t) =>
           info("got command: %s", command)
           connection.stop
-        case s:StompWireFormat =>
+        case s:StompCodec =>
           // this is passed on to us by the protocol discriminator
           // so we know which wire format is being used.
         case StompFrame(unknown, _, _, _) =>
@@ -402,7 +442,7 @@ class StompProtocolHandler extends Proto
     if( !connection.stopped ) {
       info("Shutting connection down due to: "+msg)
       connection.transport.suspendRead
-      connection.transport.offer(StompFrame(Responses.ERROR, Nil, BufferStompContent(ascii(msg))) )
+      connection.transport.offer(StompFrame(Responses.ERROR, Nil, BufferContent(ascii(msg))) )
       ^ {
         connection.stop()
       } >>: queue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 04:23:35 2010
@@ -40,7 +40,7 @@ class StompBrokerPerfTest extends BaseBr
 
   override def createConsumer() = new StompRemoteConsumer()
 
-  override def getRemoteWireFormat() = "stomp"
+  override def getRemoteProtocolName() = "stomp"
 
 }
 
@@ -52,7 +52,7 @@ class StompPersistentBrokerPerfTest exte
 
   override def createConsumer() = new StompRemoteConsumer()
 
-  override def getRemoteWireFormat() = "stomp"
+  override def getRemoteProtocolName() = "stomp"
 
 }
 
@@ -66,7 +66,7 @@ class StompHawtDBPersistentBrokerPerfTes
 
   override def createConsumer() = new StompRemoteConsumer()
 
-  override def getRemoteWireFormat() = "stomp"
+  override def getRemoteProtocolName() = "stomp"
 
   override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
     val rc = super.createBrokerConfig(name, bindURI, connectUri)
@@ -166,7 +166,7 @@ class StompRemoteProducer extends Remote
     //    }
 
     var content = ascii(createPayload());
-    frame = StompFrame(Stomp.Commands.SEND, headers, BufferStompContent(content))
+    frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
     drain()
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 04:23:35 2010
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.transport.tcp;
 
+import org.apache.activemq.apollo.transport.ProtocolCodec;
+import org.apache.activemq.apollo.transport.TransportListener;
 import org.apache.activemq.apollo.util.JavaBaseService;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.apollo.transport.Transport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.fusesource.hawtdispatch.Dispatch;
@@ -35,7 +35,7 @@ import java.util.LinkedList;
 import java.util.Map;
 
 /**
- * An implementation of the {@link Transport} interface using raw tcp/ip
+ * An implementation of the {@link org.apache.activemq.apollo.transport.Transport} interface using raw tcp/ip
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -162,7 +162,7 @@ public class TcpTransport extends JavaBa
     protected URI localLocation;
     private TransportListener listener;
     private String remoteAddress;
-    private WireFormat wireformat;
+    private ProtocolCodec wireformat;
 
     private SocketChannel channel;
 
@@ -358,7 +358,7 @@ public class TcpTransport extends JavaBa
                 throw new IOException("Not running.");
             }
 
-            WireFormat.BufferState rc = wireformat.write(command);
+            ProtocolCodec.BufferState rc = wireformat.write(command);
             switch (rc ) {
                 case FULL:
                     return false;
@@ -383,7 +383,7 @@ public class TcpTransport extends JavaBa
             return;
         }
         try {
-            if( wireformat.flush() == WireFormat.BufferState.EMPTY ) {
+            if( wireformat.flush() == ProtocolCodec.BufferState.EMPTY ) {
                 writeSource.suspend();
                 listener.onRefill();
             }
@@ -476,15 +476,15 @@ public class TcpTransport extends JavaBa
         this.listener = listener;
     }
 
-    public WireFormat getWireformat() {
+    public ProtocolCodec getProtocolCodec() {
         return wireformat;
     }
 
-    public void setWireformat(WireFormat wireformat) {
-        this.wireformat = wireformat;
+    public void setProtocolCodec(ProtocolCodec protocolCodec) {
+        this.wireformat = protocolCodec;
         if( channel!=null ) {
-            wireformat.setReadableByteChannel(this.channel);
-            wireformat.setWritableByteChannel(this.channel);
+            protocolCodec.setReadableByteChannel(this.channel);
+            protocolCodec.setWritableByteChannel(this.channel);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961218&r1=961217&r2=961218&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul  7 04:23:35 2010
@@ -19,27 +19,20 @@ package org.apache.activemq.transport.tc
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.apollo.transport.Transport;
+import org.apache.activemq.apollo.transport.TransportFactory;
 //import org.apache.activemq.transport.TransportLoggerFactory;
-import org.apache.activemq.transport.TransportFactorySupport;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.apollo.transport.TransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
-import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import static org.apache.activemq.transport.TransportFactorySupport.configure;
-import static org.apache.activemq.transport.TransportFactorySupport.verify;
+import static org.apache.activemq.apollo.transport.TransportFactorySupport.configure;
+import static org.apache.activemq.apollo.transport.TransportFactorySupport.verify;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>



Mime
View raw message