activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1291476 - in /activemq/activemq-apollo/trunk: apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala pom.xml
Date Mon, 20 Feb 2012 21:59:41 GMT
Author: chirino
Date: Mon Feb 20 21:59:41 2012
New Revision: 1291476

URL: http://svn.apache.org/viewvc?rev=1291476&view=rev
Log:
Simplify the OpenWire codec by extending from the AbstractProtocolCodec.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1291476&r1=1291475&r2=1291476&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
Mon Feb 20 21:59:41 2012
@@ -18,16 +18,13 @@
 package org.apache.activemq.apollo.openwire
 
 import org.apache.activemq.apollo.broker.store.MessageRecord
-import org.fusesource.hawtdispatch.transport.ProtocolCodec
 import OpenwireConstants._
-import java.nio.ByteBuffer
-import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
-import java.io.EOFException
 import org.apache.activemq.apollo.broker.{Sizer, Message}
 import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
 import org.apache.activemq.apollo.openwire.command._
 import org.apache.activemq.apollo.broker.BufferConversions._
-import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, AbstractVarIntSupport,
Buffer}
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec
+import org.fusesource.hawtbuf._
 
 case class CachedEncoding(tight:Boolean, version:Int, buffer:Buffer) extends CachedEncodingTrait
 
@@ -40,7 +37,7 @@ case class CachedEncoding(tight:Boolean,
 object OpenwireCodec extends Sizer[Command] {
 
   final val DB_VERSION = OpenWireFormat.DEFAULT_VERSION
-  final val DB_TIGHT_ENCODING = true
+  final val DB_TIGHT_ENCODING = false
 
   def encode(message: Message):MessageRecord = {
     val rc = new MessageRecord
@@ -105,187 +102,47 @@ object OpenwireCodec extends Sizer[Comma
   }
 }
 
-class OpenwireCodec extends ProtocolCodec {
-
-  implicit def toBuffer(value:Array[Byte]):Buffer = new Buffer(value)
-
-  def protocol = PROTOCOL
-
-  var write_buffer_size = 1024*64;
-  var write_counter = 0L
-  var write_channel:WritableByteChannel = null
-
-  var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
-  var write_buffer = ByteBuffer.allocate(0)
+class OpenwireCodec extends AbstractProtocolCodec {
 
   val format = new OpenWireFormat(1);
 
-  def full = next_write_buffer.size() >= (write_buffer_size >> 1)
-  def is_empty = write_buffer.remaining() == 0
-
-  def setWritableByteChannel(channel: WritableByteChannel) = {
-    this.write_channel = channel
-    if( this.write_channel.isInstanceOf[SocketChannel] ) {
-      this.write_channel.asInstanceOf[SocketChannel].socket().setSendBufferSize(write_buffer_size);
-    }
+  protected def encode(command: AnyRef) = {
+    format.marshal(command, nextWriteBuffer)
   }
 
-  def getWriteCounter = write_counter
-
-  def write(command: Any):ProtocolCodec.BufferState =  {
-    if ( full) {
-      ProtocolCodec.BufferState.FULL
-    } else {
-      val was_empty = is_empty
-      command match {
-        case command:ActiveMQMessage=>
-          command.getCachedEncoding match {
-            case CachedEncoding(tight, version, buffer) =>
-              // We might be able to re-use the origin format of the message.
-              if( !format.isCacheEnabled && format.isTightEncodingEnabled==tight
&& format.getVersion==version ) {
-                next_write_buffer.write(buffer)
-              } else {
-                format.marshal(command, next_write_buffer)
+  private final val readHeader:AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action
{
+    def apply = {
+      val header = peekBytes(4)
+      if( header==null ) {
+        null
+      } else {
+        val length = header.bigEndianEditor().readInt()
+        nextDecodeAction = new AbstractProtocolCodec.Action {
+          def apply() = {
+            val frame = readBytes(4+length)
+            if( frame==null ) {
+              null
+            } else {
+              val command = format.unmarshal(frame)
+              nextDecodeAction = readHeader
+              // If value caching is NOT enabled, then we potentially re-use the encode
+              // value of the message.
+              command match {
+                case message:ActiveMQMessage =>
+                  message.setEncodedSize(length)
+                  if( !format.isCacheEnabled ) {
+                    message.setCachedEncoding(CachedEncoding(format.isTightEncodingEnabled,
format.getVersion, frame))
+                  }
+                case _ =>
               }
-            case _ =>
-              format.marshal(command, next_write_buffer)
+              command
+            }
           }
-          
-        case command:Command=>
-          format.marshal(command, next_write_buffer)
-      }
-      if( was_empty ) {
-        ProtocolCodec.BufferState.WAS_EMPTY
-      } else {
-        ProtocolCodec.BufferState.NOT_EMPTY
-      }
-    }
-  }
-
-  def flush():ProtocolCodec.BufferState = {
-    // if we have a pending write that is being sent over the socket...
-    if ( write_buffer.remaining() != 0 ) {
-      write_counter += write_channel.write(write_buffer)
-    }
-
-    // if it is now empty try to refill...
-    if ( is_empty && next_write_buffer.size()!=0 ) {
-        // size of next buffer is based on how much was used in the previous buffer.
-        val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
-        write_buffer = next_write_buffer.toBuffer().toByteBuffer()
-        next_write_buffer = new DataByteArrayOutputStream(prev_size)
-    }
-
-    if ( is_empty ) {
-      ProtocolCodec.BufferState.EMPTY
-    } else {
-      ProtocolCodec.BufferState.NOT_EMPTY
-    }
-  }
-
-  var read_counter = 0L
-  var read_buffer_size = 1024*64
-  var read_channel:ReadableByteChannel = null
-
-  var read_buffer:ByteBuffer = ByteBuffer.allocate(4)
-  var read_waiting_on = 4
-
-  var next_action:()=>Command = read_header
-
-  def setReadableByteChannel(channel: ReadableByteChannel) = {
-    this.read_channel = channel
-    if( this.read_channel.isInstanceOf[SocketChannel] ) {
-      this.read_channel.asInstanceOf[SocketChannel].socket().setReceiveBufferSize(read_buffer_size);
-    }
-  }
-
-  def unread(buffer: Array[Byte]) = {
-    assert(read_counter == 0)
-    read_buffer = buffer.toByteBuffer
-    read_buffer.position(read_buffer.limit)
-    read_counter += buffer.length
-    read_waiting_on -= buffer.length
-    if ( read_waiting_on <= 0 ) {
-      read_buffer.flip
-    }
-  }
-
-  def getReadCounter = read_counter
-
-  override def read():Object = {
-
-    var command:Object = null
-    while( command==null ) {
-      // do we need to read in more data???
-      if ( read_waiting_on > 0 ) {
-
-        // Try to fill the buffer with data from the socket..
-        var p = read_buffer.position()
-        var count = read_channel.read(read_buffer)
-        if (count == -1) {
-            throw new EOFException("Peer disconnected")
-        } else if (count == 0) {
-            return null
-        }
-        read_counter += count
-        read_waiting_on -= count
-
-        if ( read_waiting_on <= 0 ) {
-          read_buffer.flip
-        }
-
-      } else {
-        command = next_action()
-        if ( read_waiting_on > 0 ) {
-          val next_buffer = ByteBuffer.allocate(read_buffer.remaining+read_waiting_on)
-          next_buffer.put(read_buffer)
-          read_buffer = next_buffer
         }
+        nextDecodeAction.apply()
       }
     }
-    return command
   }
-
-  def read_header:()=>Command = ()=> {
-
-    read_buffer.mark
-    val size = read_buffer.getInt
-    read_buffer.reset
-
-    read_waiting_on += (size)
-
-    next_action = read_command(size+4)
-    null
-  }
-
-  def read_command(size:Int) = ()=> {
-
-    val buf = new Buffer(read_buffer.array, read_buffer.position, size)
-    val rc = format.unmarshal(buf)
-    read_buffer.position(read_buffer.position+size)
-
-    read_waiting_on += 4
-    next_action = read_header
-    var command: Command = rc.asInstanceOf[Command]
-    
-    // If value caching is NOT enabled, then we potentially re-use the encode
-    // value of the message.
-    command match {
-      case message:ActiveMQMessage =>
-        message.setEncodedSize(size)
-        if( !format.isCacheEnabled ) {
-          message.setCachedEncoding(CachedEncoding(format.isTightEncodingEnabled, format.getVersion,
buf))
-        }
-      case _ =>
-    }
-    command
-  }
-
-  def getLastWriteSize = 0
-
-  def getLastReadSize = 0
-
-  def getWriteBufferSize = write_buffer_size
-
-  def getReadBufferSize = read_buffer_size
+  
+  protected def initialDecodeAction = readHeader
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1291476&r1=1291475&r2=1291476&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Mon Feb 20 21:59:41 2012
@@ -615,7 +615,7 @@ class OpenwireProtocolHandler extends Pr
 
   case class OpenwireDeliveryProducerRoute(addresses:Array[SimpleAddress]) extends DeliveryProducerRoute(host.router)
{
 
-    override def send_buffer_size =  codec.write_buffer_size
+    override def send_buffer_size =  codec.getReadBufferSize
 
     override def connection = Some(OpenwireProtocolHandler.this.connection)
     override def dispatch_queue = queue
@@ -917,7 +917,7 @@ class OpenwireProtocolHandler extends Pr
     override def connection = Some(OpenwireProtocolHandler.this.connection)
 
     def is_persistent = false
-    override def receive_buffer_size = codec.write_buffer_size
+    override def receive_buffer_size = codec.getWriteBufferSize*4
 
     def matches(delivery:Delivery) = {
       if( delivery.message.protocol eq OpenwireProtocol ) {

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1291476&r1=1291475&r2=1291476&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Feb 20 21:59:41 2012
@@ -97,7 +97,7 @@
     <felix-version>1.0.0</felix-version>
 
     <hawtdispatch-version>1.9-SNAPSHOT</hawtdispatch-version>
-    <hawtbuf-version>1.8</hawtbuf-version>
+    <hawtbuf-version>1.9-SNAPSHOT</hawtbuf-version>
     <stompjms-version>1.8</stompjms-version>
     
     <bdb-version>5.0.34</bdb-version>



Mime
View raw message