activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1333246 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Wed, 02 May 2012 23:06:57 GMT
Author: chirino
Date: Wed May  2 23:06:57 2012
New Revision: 1333246

URL: http://svn.apache.org/viewvc?rev=1333246&view=rev
Log:
Simplify the StompCodec by extending the AbstractProtocolCodec base class also have the OpenWire
protocol share the buffer pool with the Stomp protocol

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1333246&r1=1333245&r2=1333246&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
Wed May  2 23:06:57 2012
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit
 import security.SecuredResource.BrokerKind
 import reflect.BeanProperty
 import java.net.InetSocketAddress
+import org.fusesource.hawtdispatch.util.BufferPools
 
 /**
  * <p>
@@ -153,6 +154,7 @@ object Broker extends Log {
 
   val BLOCKABLE_THREAD_POOL = ApolloThreadPool.INSTANCE
   private val SERVICE_TIMEOUT = 1000*5;
+  val buffer_pools = new BufferPools
 
   def class_loader:ClassLoader = ClassFinder.class_loader
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1333246&r1=1333245&r2=1333246&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
Wed May  2 23:06:57 2012
@@ -19,12 +19,12 @@ package org.apache.activemq.apollo.openw
 
 import org.apache.activemq.apollo.broker.store.MessageRecord
 import OpenwireConstants._
-import org.apache.activemq.apollo.broker.{Sizer, Message}
 import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
 import org.apache.activemq.apollo.openwire.command._
 import org.apache.activemq.apollo.broker.BufferConversions._
 import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec
 import org.fusesource.hawtbuf._
+import org.apache.activemq.apollo.broker.{Broker, Sizer, Message}
 
 case class CachedEncoding(tight:Boolean, version:Int, buffer:Buffer) extends CachedEncodingTrait
 
@@ -104,6 +104,7 @@ object OpenwireCodec extends Sizer[Comma
 
 class OpenwireCodec extends AbstractProtocolCodec {
 
+  this.bufferPools = Broker.buffer_pools
   val format = new OpenWireFormat(1);
 
   protected def encode(command: AnyRef) = {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1333246&r1=1333245&r2=1333246&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Wed May  2 23:06:57 2012
@@ -18,52 +18,19 @@ package org.apache.activemq.apollo.stomp
 
 import _root_.org.apache.activemq.apollo.broker._
 
-import java.nio.ByteBuffer
 import Stomp._
 
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
-import java.io.{EOFException, DataOutput, IOException}
-import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
+import java.io.{DataOutput, IOException}
 import org.fusesource.hawtdispatch.transport._
 import _root_.org.fusesource.hawtbuf._
-import Buffer._
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.broker.store.MessageRecord
 import java.lang.ThreadLocal
-import java.util.{Arrays, ArrayList}
-import collection.mutable.HashMap
-
-object BufferPool {
-
-  final val buffer_count_per_thread = 100
-  final val queues = new ThreadLocal[HashMap[Int, ArrayList[Array[Byte]]]]()
-
-  def queue(size:Int) = {
-    var sizes = queues.get()
-    if( sizes == null ) {
-      sizes = HashMap()
-      queues.set(sizes)
-    }
-    sizes.getOrElseUpdate(size, new ArrayList[Array[Byte]](buffer_count_per_thread))
-  }
-
-  def checkout(size:Int):Array[Byte] = {
-    val q = queue(size)
-    if( q.isEmpty ) {
-      new Array[Byte](size)
-    } else {
-      q.remove(q.size()-1)
-    }
-  }
-
-  def checkin(buffer:Array[Byte]) = {
-    val q = queue(buffer.length)
-    if( q.size() < buffer_count_per_thread ) {
-      q.add(buffer)
-    }
-  }
-}
+import java.util.ArrayList
+import collection.mutable.{ListBuffer, HashMap}
+import org.fusesource.hawtdispatch.util.BufferPools
 
 object StompCodec extends Log {
 
@@ -166,109 +133,16 @@ object StompCodec extends Log {
 
 }
 
-class StompCodec extends ProtocolCodec with TransportAware {
-
-  import StompCodec._
-  var max_header_length = 1024*10
-  var max_headers = 1000
-  var max_data_length = 1024 * 1024 * 100
-
-  var direct_buffer_allocator:DirectBufferAllocator = null
-
-  implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
-  implicit def wrap(x: Byte) = {
-    ByteBuffer.wrap(Array(x));
-  }
-
-  def protocol() = "stomp"
-
-  def setTransport(transport:Transport) {
-    transport match {
-      case tcp:TcpTransport=>
-        write_buffer_size = tcp.getSendBufferSize();
-        read_buffer_size = tcp.getReceiveBufferSize();
-      case udp:UdpTransport=>
-        write_buffer_size = udp.getSendBufferSize();
-        read_buffer_size = udp.getReceiveBufferSize();
-      case _ =>
-        try {
-          write_channel match {
-            case channel:SocketChannel =>
-              write_buffer_size = channel.socket.getSendBufferSize();
-              read_buffer_size = channel.socket.getReceiveBufferSize()
-            case channel:SslTransport#SSLChannel =>
-              write_buffer_size = channel.socket.getSendBufferSize();
-              read_buffer_size = channel.socket.getReceiveBufferSize()
-          }
-        } catch {
-          case _ =>
-        }
-
-    }
-    read_buffer = ByteBuffer.allocate(read_buffer_size)
-  }
-  
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Non blocking write imp
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  var write_buffer_size = 1024*64;
-  var write_counter = 0L
-  var write_channel:WritableByteChannel = null
-
-  var next_write_buffer : DataByteArrayOutputStream = _
-  var next_write_direct:DirectBuffer = null
-
-  var write_buffer:ByteBuffer = null
-  var write_direct:DirectBuffer = null
-  var write_direct_pos = 0
-  var last_write_io_size = 0
-
-  def full = next_write_direct!=null || (next_write_buffer!=null && next_write_buffer.size
>= (write_buffer_size >> 1))
-  def is_empty = write_buffer==null && write_direct==null
+class StompCodec extends AbstractProtocolCodec {
+  this.bufferPools = Broker.buffer_pools
+  var max_header_length: Int = 1024 * 10
+  var max_headers: Int = 1000
+  var max_data_length: Int = 1024 * 1024 * 100
+  var trim = true
 
-  def setWritableByteChannel(channel: WritableByteChannel) = {
-    this.write_channel = channel
-  }
-
-  def getWriteCounter = write_counter
-
-  def getLastWriteSize = last_write_io_size
-
-  def write(command: Any):ProtocolCodec.BufferState =  {
-    if ( full) {
-      ProtocolCodec.BufferState.FULL
-    } else {
-      val was_empty = is_empty
-      if(next_write_buffer==null) {
-        next_write_buffer = new DataByteArrayOutputStream(BufferPool.checkout(write_buffer_size))
{
-          // Checks the buffer back in if we re-size..
-          override def resize(newcount: Int)  = {
-            val oldbuf = buf
-            super.resize(newcount)
-            if( oldbuf.length == write_buffer_size) {
-              BufferPool.checkin(oldbuf)
-            }
-          }
-        }
-      }
-      command match {
-        case buffer:Buffer=>
-          buffer.writeTo(next_write_buffer.asInstanceOf[DataOutput])        
-        case frame:StompFrame=>
-          encode(frame, next_write_buffer);
-      }
-      if (next_write_buffer.size() >= (write_buffer_size*0.75) ) {
-          flush();
-      }
-      if( was_empty ) {
-        ProtocolCodec.BufferState.WAS_EMPTY
-      } else {
-        ProtocolCodec.BufferState.NOT_EMPTY
-      }
-    }
+  protected def encode(command: AnyRef) = command match {
+    case buffer:Buffer=> buffer.writeTo(nextWriteBuffer.asInstanceOf[DataOutput])
+    case frame:StompFrame=> encode(frame, nextWriteBuffer);
   }
 
   def encode(frame:StompFrame, os:DataOutput) = {
@@ -305,9 +179,9 @@ class StompCodec extends ProtocolCodec w
       os.write(NEWLINE)
 
       frame.content match {
-        case x:ZeroCopyContent=>
-          assert(next_write_direct==null)
-          next_write_direct = x.zero_copy_buffer
+//        case x:ZeroCopyContent=>
+//          assert(next_write_direct==null)
+//          next_write_direct = x.zero_copy_buffer
         case x:BufferContent=>
           x.content.writeTo(os)
           END_OF_FRAME_BUFFER.writeTo(os)
@@ -317,378 +191,122 @@ class StompCodec extends ProtocolCodec w
     }
   }
 
+  import StompCodec._
 
-  def flush():ProtocolCodec.BufferState = {
-    while(true) {
-      // if we have a pending write that is being sent over the socket...
-      if ( write_buffer != null ) {
-        last_write_io_size = write_channel.write(write_buffer)
-        if ( last_write_io_size==0 ) {
-          return ProtocolCodec.BufferState.NOT_EMPTY
-        } else {
-          write_counter += last_write_io_size
-          if( write_buffer.remaining() == 0 ) {
-            write_buffer = null
-          }
-        }
-      } else {
-        if ( write_direct!=null ) {
-          last_write_io_size = write_direct.read(write_direct_pos, write_channel)
-          if ( last_write_io_size==0 )
-            return ProtocolCodec.BufferState.NOT_EMPTY
-          else {
-            write_direct_pos += last_write_io_size
-            write_counter += last_write_io_size
-
-            if( write_direct.remaining(write_direct_pos) == 0 ) {
-              write_direct.release
-              write_direct = null
-              write_direct_pos = 0
-              write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
-            }
-          }
-        } else {
-          if( next_write_buffer==null ) {
-            return ProtocolCodec.BufferState.EMPTY
-          } else {
-            // size of next buffer is based on how much was used in the previous buffer.
-            write_buffer = next_write_buffer.toBuffer().toByteBuffer()
-            write_direct = next_write_direct
-
-            next_write_buffer = null
-            next_write_direct = null
-          }
-        }
-      }
-    }
-    ProtocolCodec.BufferState.NOT_EMPTY
-  }
-
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Non blocking read impl 
-  //
-  /////////////////////////////////////////////////////////////////////
-  
-  type FrameReader = (ByteBuffer)=>StompFrame
-
-  var read_counter = 0L
-  var read_buffer_size = 1024*64
-  var read_channel:ReadableByteChannel = null
-
-  var read_buffer:ByteBuffer = _
-  var read_end = 0
-  var read_start = 0
-
-  var last_read_io_size = 0
-
-  var read_direct:DirectBuffer = null
-  var read_direct_pos = 0
-
-  var next_action:FrameReader = read_action
-  var trim = true
-
-  def setReadableByteChannel(channel: ReadableByteChannel) = {
-    this.read_channel = channel
-  }
-
-  def unread(buffer: Array[Byte]) = {
-    assert(read_counter == 0)
-    read_buffer = ByteBuffer.allocate(buffer.length)
-    read_buffer.put(buffer)
-    read_counter += buffer.length
-  }
-
-  def getReadCounter = read_counter
-
-  def getLastReadSize = last_read_io_size
-
-  override def read():Object = {
-
-    var command:Object = null
-    while( command==null ) {
-      // do we need to read in more data???
-      if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) {
-        last_read_io_size = read_direct.write(read_channel, read_direct_pos)
-
-        if (last_read_io_size == -1) {
-            throw new EOFException("Peer disconnected")
-        } else if (last_read_io_size == 0) {
-            return null
-        }
-        read_direct_pos += last_read_io_size
-        read_counter += last_read_io_size
-      } else if (read_buffer==null || read_end == read_buffer.position() ) {
-
-          // do we need a new data buffer to read data into??
-          if (read_buffer==null || read_buffer.remaining() == 0) {
-
-              // How much data is still not consumed by the wireformat
-              val size = read_end - read_start
-
-              var new_capacity = if(read_start == 0) {
-                size+read_buffer_size
-              } else {
-                if (size > read_buffer_size) {
-                  size+read_buffer_size
-                } else {
-                  read_buffer_size
-                }
-              }
-
-              read_buffer = ByteBuffer.wrap(if (size > 0) {
-                val rc = Arrays.copyOfRange(read_buffer.array(), read_start, read_start+new_capacity)
-                rc
-              } else {
-                if (new_capacity == read_buffer_size)
-                  BufferPool.checkout(read_buffer_size)
-                else
-                  new Array[Byte](new_capacity)
-              })
-              read_buffer.position(size)
-              read_start = 0
-              read_end = size
-          }
-
-          // Try to fill the buffer with data from the socket..
-          var p = read_buffer.position()
-          last_read_io_size = read_channel.read(read_buffer)
-          if (last_read_io_size == -1) {
-              throw new EOFException("Peer disconnected")
-          } else if (last_read_io_size == 0) {
-              if( read_start == read_end ) {
-                if( read_end==0 && read_buffer.array().length ==  read_buffer_size
){
-                  BufferPool.checkin(read_buffer.array())
-                } else {
-                  read_start = 0
-                  read_end = 0
-                }
-                read_buffer = null
-              }
-              return null
-          }
-          read_counter += last_read_io_size
-      }
-
-      command = next_action(read_buffer)
+  protected def initialDecodeAction = read_action
 
-      // Sanity checks to make sure the wireformat is behaving as expected.
-      assert(read_start <= read_end)
-      assert(read_end <= read_buffer.position())
-    }
-    return command
-  }
 
-  def read_line(buffer:ByteBuffer, max:Int, errorMessage:String):Buffer = {
-      val read_limit = buffer.position
-      while( read_end < read_limit ) {
-        if( buffer.array()(read_end) =='\n') {
-          var rc = new Buffer(buffer.array, read_start, read_end-read_start)
-          read_end += 1
-          read_start = read_end
-          return rc
+  private final val read_action: AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action
{
+    def apply: AnyRef = {
+      var line = readUntil(NEWLINE, max_command_length, "The maximum command length was exceeded")
+      if (line != null) {
+        var action = line.moveTail(-1)
+        if (trim) {
+          action = action.trim
+        }
+        if (action.length > 0) {
+          nextDecodeAction = read_headers(action.ascii)
+          return nextDecodeAction();
         }
-        if (max != -1 && read_end-read_start > max) {
-            throw new IOException(errorMessage)
-        }
-        read_end += 1
       }
       return null
-  }
-
-  def read_action:FrameReader = (buffer)=> {
-    val line = read_line(buffer, max_command_length, "The maximum command length was exceeded")
-    if( line !=null ) {
-      var action = line
-      if( trim ) {
-          action = action.trim()
-      }
-      if (action.length() > 0) {
-          next_action = read_headers(action.ascii)
-      }
     }
-    null
   }
 
-  def read_headers(action:AsciiBuffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader
= (buffer)=> {
-    var line = read_line(buffer, max_header_length, "The maximum header length was exceeded")
-    while( line !=null ) {
-      if( line.trim().length > 0 ) {
-
-        if (max_headers != -1 && headers.size > max_headers) {
+  private def read_headers(command: AsciiBuffer): AbstractProtocolCodec.Action = new AbstractProtocolCodec.Action
{
+    var contentLength:AsciiBuffer = _
+    val headers = new ListBuffer[(AsciiBuffer, AsciiBuffer)]()
+
+    def apply: AnyRef = {
+      var line = readUntil(NEWLINE, max_header_length, "The maximum header length was exceeded")
+      if (line != null) {
+        line = line.moveTail(-1)
+        if (line.length > 0) {
+          if (max_headers != -1 && headers.size > max_headers) {
             throw new IOException("The maximum number of headers was exceeded")
-        }
-
-        try {
-            val seperatorIndex = line.indexOf(COLON)
-            if( seperatorIndex<0 ) {
-                println("===")
-                println(new AsciiBuffer(buffer.array(), 0, read_buffer_size))
-                println("===")
-                throw new IOException("Header line missing seperator [" + ascii(line) + "]")
+          }
+          try {
+            var seperatorIndex: Int = line.indexOf(COLON)
+            if (seperatorIndex < 0) {
+              throw new IOException("Header line missing seperator [" + line.ascii + "]")
             }
-            var name = line.slice(0, seperatorIndex)
-            if( trim ) {
-                name = name.trim()
+            var name: Buffer = line.slice(0, seperatorIndex)
+            if (trim) {
+              name = name.trim
             }
-            var value = line.slice(seperatorIndex + 1, line.length())
-            if( trim ) {
-                value = value.trim()
+            var value: Buffer = line.slice(seperatorIndex + 1, line.length)
+            if (trim) {
+              value = value.trim
             }
-            headers.add((ascii(name), ascii(value)))
-        } catch {
-            case e:Exception=>
-              throw new IOException("Unable to parser header line [" + line + "]")
-        }
-
-        line = read_line(buffer, max_header_length, "The maximum header length was exceeded")
-      } else {
-        val contentLength = get(headers, CONTENT_LENGTH)
-        if (contentLength.isDefined) {
-          // Bless the client, he's telling us how much data to read in.
-          var length=0
-          try {
-              length = Integer.parseInt(contentLength.get.trim().toString())
+            var entry = (name.ascii, value.ascii)
+            if (entry._1 == CONTENT_LENGTH && contentLength==null) {
+              contentLength = entry._2
+            }
+            headers.add(entry)
           } catch {
-            case e:NumberFormatException=>
-              throw new IOException("Specified content-length is not a valid integer")
+            case e: Exception => {
+              throw new IOException("Unable to parser header line [" + line + "]")
+            }
           }
-
-          if (max_data_length != -1 && length > max_data_length) {
+        } else {
+          val h = headers.toList
+          if (contentLength != null) {
+            var length = try {
+              contentLength.toString.toInt
+            } catch {
+              case e: NumberFormatException =>
+                throw new IOException("Specified content-length is not a valid integer")
+            }
+            if (max_data_length != -1 && length > max_data_length) {
               throw new IOException("The maximum data length was exceeded")
-          }
-
-          // lets try to keep the content of big message outside of the JVM's garbage collection
-          // to keep the number of GCs down when moving big messages.
-          def is_message = action == SEND || action == MESSAGE
-          if( length > 1024 && direct_buffer_allocator!=null && is_message)
{
-
-            read_direct = direct_buffer_allocator.alloc(length)
-
-            val dup = buffer.duplicate
-            dup.position(read_start)
-            dup.limit(buffer.position)
-
-            // copy in the body the was read so far...
-            read_direct_pos = read_direct.write(dup, 0)
-
-            // since it was copied.. reposition to re-use the copied area..
-            dup.compact
-            buffer.position(buffer.position - read_direct_pos)
-            read_end = read_start
-
-            next_action = read_binary_body_direct(action, headers, length)
+            }
+            nextDecodeAction = read_binary_body(command, h, length)
           } else {
-            next_action = read_binary_body(action, headers, length)
+            nextDecodeAction = read_text_body(command, h)
           }
-
-        } else {
-          next_action = read_text_body(action, headers)
+          return nextDecodeAction.apply()
         }
-        line = null
       }
-    }
-    null
-  }
-
-  def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
-    val i = headers.iterator
-    while( i.hasNext ) {
-      val entry = i.next
-      if( entry._1 == name ) {
-        return Some(entry._2)
-      }
-    }
-    None
-  }
-
-  def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader
= (buffer)=> {
-    if( read_direct.remaining(read_direct_pos)==0 ) {
-      next_action = read_direct_terminator(action, headers, contentLength, read_direct)
-      read_direct = null
-      read_direct_pos = 0
-    }
-    null
-  }
-
-  def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int,
ma:DirectBuffer):FrameReader = (buffer)=> {
-    if( read_frame_terminator(buffer, contentLength) ) {
-      next_action = read_action
-      new StompFrame(ascii(action), headers.toList, ZeroCopyContent(ma))
-    } else {
-      null
+      return null
     }
   }
 
-  def read_frame_terminator(buffer:ByteBuffer, contentLength:Int):Boolean = {
-      val read_limit = buffer.position
-      if( (read_limit-read_start) < 1 ) {
-        read_end = read_limit
-        false
-      } else {
-        if( buffer.array()(read_start)!= 0 ) {
-           throw new IOException("Expected null termintor after "+contentLength+" content
bytes")
+  private def read_binary_body(command: AsciiBuffer, headers:HeaderMap, contentLength: Int):
AbstractProtocolCodec.Action = {
+    return new AbstractProtocolCodec.Action {
+      def apply: AnyRef = {
+        var content = readBytes(contentLength + 1)
+        if (content != null) {
+          if (content.get(contentLength) != 0) {
+            throw new IOException("Expected null termintor after " + contentLength + " content
bytes")
+          }
+          nextDecodeAction = read_action
+          content.moveTail(-1)
+          val body = if( content.length() == 0) NilContent else BufferContent(content)
+          return new StompFrame(command, headers, body)
+        }
+        else {
+          return null
         }
-        read_end = read_start+1
-        read_start = read_end
-        true
       }
-  }
-
-  def read_binary_body(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader
= (buffer)=> {
-    val content:Buffer=read_content(buffer, contentLength)
-    if( content != null ) {
-      next_action = read_action
-      new StompFrame(ascii(action), headers.toList, BufferContent(content))
-    } else {
-      null
     }
   }
 
-  def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
-      val read_limit = buffer.position
-      if( (read_limit-read_start) < contentLength+1 ) {
-        read_end = read_limit
-        null
-      } else {
-        if( buffer.array()(read_start+contentLength)!= 0 ) {
-           throw new IOException("Expected null termintor after "+contentLength+" content
bytes")
+  private def read_text_body(command: AsciiBuffer, headers:HeaderMap): AbstractProtocolCodec.Action
= {
+    return new AbstractProtocolCodec.Action {
+      def apply: AnyRef = {
+        var content: Buffer = readUntil(0.asInstanceOf[Byte])
+        if (content != null) {
+          nextDecodeAction = read_action
+          content.moveTail(-1)
+          val body = if( content.length() == 0) NilContent else BufferContent(content)
+          return new StompFrame(command, headers, body)
         }
-        var rc = new Buffer(buffer.array, read_start, contentLength)
-        read_end = read_start+contentLength+1
-        read_start = read_end
-        rc
-      }
-  }
-
-  def read_to_null(buffer:ByteBuffer):Buffer = {
-      val read_limit = buffer.position
-      while( read_end < read_limit ) {
-        if( buffer.array()(read_end) ==0) {
-          var rc = new Buffer(buffer.array, read_start, read_end-read_start)
-          read_end += 1
-          read_start = read_end
-          return rc
+        else {
+          return null
         }
-        read_end += 1
       }
-      return null
-  }
-
-
-  def read_text_body(action:AsciiBuffer, headers:HeaderMapBuffer):FrameReader = (buffer)=>
{
-    val content:Buffer=read_to_null(buffer)
-    if( content != null ) {
-      next_action = read_action
-      new StompFrame(ascii(action), headers.toList, BufferContent(content))
-    } else {
-      null
     }
   }
 
-  def getWriteBufferSize = write_buffer_size
 
-  def getReadBufferSize = read_buffer_size
-
-}
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1333246&r1=1333245&r2=1333246&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed May  2 23:06:57 2012
@@ -950,7 +950,7 @@ class StompProtocolHandler extends Proto
       }
 
       connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
-      codec.direct_buffer_allocator = this.host.direct_buffer_allocator
+//      codec.direct_buffer_allocator = this.host.direct_buffer_allocator
     }
 
     suspend_read("virtual host lookup")



Mime
View raw message