activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961202 - in /activemq/sandbox/activemq-apollo-actor/activemq-stomp/src: main/scala/org/apache/activemq/apollo/stomp/ test/scala/org/apache/activemq/apollo/stomp/perf/
Date Wed, 07 Jul 2010 04:18:53 GMT
Author: chirino
Date: Wed Jul  7 04:18:52 2010
New Revision: 961202

URL: http://svn.apache.org/viewvc?rev=961202&view=rev
Log:
Starting to hook in the memory pool api into the stomp wireformat.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961202&r1=961201&r2=961202&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Jul  7 04:18:52 2010
@@ -20,8 +20,10 @@ import _root_.java.util.LinkedList
 import _root_.org.apache.activemq.filter.{Expression, Filterable}
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.ListBuffer
-import org.apache.activemq.apollo.broker.{Sizer, Destination, BufferConversions, Message}
 import java.lang.{String, Class}
+import java.io.DataOutput
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.MemoryAllocation
 
 /**
  *
@@ -91,16 +93,33 @@ case class StompFrameMessage(frame:Stomp
   }
 
   def getBodyAs[T](toType : Class[T]) = {
-    (if( toType == classOf[String] ) {
-      frame.content.utf8
-    } else if (toType == classOf[Buffer]) {
-      frame.content
-    } else if (toType == classOf[AsciiBuffer]) {
-      frame.content.ascii
-    } else if (toType == classOf[UTF8Buffer]) {
-      frame.content.utf8
-    } else {
-      null
+    (frame.content match {
+      case x:BufferStompContent =>
+        if( toType == classOf[String] ) {
+          x.content.utf8
+        } else if (toType == classOf[Buffer]) {
+          x.content
+        } else if (toType == classOf[AsciiBuffer]) {
+          x.content.ascii
+        } else if (toType == classOf[UTF8Buffer]) {
+          x.content.utf8
+        } else {
+          null
+        }
+      case x:DirectStompContent =>
+        null
+      case NilStompContent =>
+        if( toType == classOf[String] ) {
+          ""
+        } else if (toType == classOf[Buffer]) {
+          new Buffer(0)
+        } else if (toType == classOf[AsciiBuffer]) {
+          new AsciiBuffer("")
+        } else if (toType == classOf[UTF8Buffer]) {
+          new UTF8Buffer("")
+        } else {
+          null
+        }
     }).asInstanceOf[T]
   }
 
@@ -142,12 +161,61 @@ object StompFrame extends Sizer[StompFra
   def size(value:StompFrame) = value.size   
 }
 
+trait StompContent {
+  def length:Int
+
+  def isEmpty = length == 0
+
+  def writeTo(os:DataOutput)
+
+  def utf8:UTF8Buffer
+
+}
+
+object NilStompContent extends StompContent {
+  def length = 0
+  def writeTo(os:DataOutput) = {}
+  val utf8 = new UTF8Buffer("")
+}
+
+case class BufferStompContent(content:Buffer) extends StompContent {
+  def length = content.length
+  def writeTo(os:DataOutput) = content.writeTo(os)
+  def utf8:UTF8Buffer = content.utf8
+}
+
+case class DirectStompContent(direct:MemoryAllocation) extends StompContent {
+  def length = direct.size-1
+
+  def writeTo(os:DataOutput) = {
+    val buff = new Array[Byte](1024*4)
+    val source = direct.buffer.duplicate
+    var remaining = direct.size-1
+    while( remaining> 0 ) {
+      val c = remaining.min(buff.length)
+      source.get(buff, 0, c)
+      os.write(buff, 0, c)
+      remaining -= c
+    }
+  }
+
+  def buffer:Buffer = {
+    val rc = new DataByteArrayOutputStream(direct.size-1)
+    writeTo(rc)
+    rc.toBuffer
+  }
+
+  def utf8:UTF8Buffer = {
+    buffer.utf8
+  }
+}
+
 /**
  * Represents all the data in a STOMP frame.
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA,
updated_headers:HeaderMap=Nil) {
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:StompContent=NilStompContent,
updated_headers:HeaderMap=Nil) {
 
   def size_of_updated_headers = {
     size_of(updated_headers)
@@ -180,14 +248,22 @@ case class StompFrame(action:AsciiBuffer
     rc
   }
 
-  def size = {
-     if( (action.data eq content.data) && updated_headers==Nil ) {
-        (content.offset-action.offset)+content.length
-     } else {
-       action.length + 1 +
-       size_of_updated_headers +
-       size_of_original_headers + 1 + content.length
+  def are_headers_in_content_buffer = !headers.isEmpty && 
+          content.isInstanceOf[BufferStompContent] &&
+          ( headers.head._1.data eq content.asInstanceOf[BufferStompContent].content.data
)
+
+  def size:Int = {
+     content match {
+       case x:BufferStompContent =>
+         if( (action.data eq x.content.data) && updated_headers==Nil ) {
+            return (x.content.offset-action.offset)+x.content.length
+         }
+       case _ =>
      }
+
+     action.length + 1 +
+     size_of_updated_headers +
+     size_of_original_headers + 1 + content.length
   }
 
   def header(name:AsciiBuffer) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961202&r1=961201&r2=961202&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Wed Jul  7 04:18:52 2010
@@ -400,7 +400,7 @@ class StompProtocolHandler extends Proto
     if( !connection.stopped ) {
       info("Shutting connection down due to: "+msg)
       connection.transport.suspendRead
-      connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)))
+      connection.transport.offer(StompFrame(Responses.ERROR, Nil, BufferStompContent(ascii(msg)))
)
       ^ {
         connection.stop()
       } >>: queue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961202&r1=961201&r2=961202&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
Wed Jul  7 04:18:52 2010
@@ -31,7 +31,7 @@ import _root_.scala.collection.JavaConve
 import StompFrameConstants._
 import java.io.{EOFException, DataOutput, DataInput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
-import org.apache.activemq.apollo.MemoryPool
+import org.apache.activemq.apollo.{MemoryAllocation, MemoryPool}
 
 /**
  * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a>
protocol.
@@ -116,12 +116,11 @@ class StompWireFormat extends WireFormat
     }
 
     // we can optimize a little if the headers and content are in the same buffer..
-    if( !frame.headers.isEmpty && !frame.content.isEmpty &&
-            ( frame.headers.head._1.data eq frame.content.data ) ) {
+    if( frame.are_headers_in_content_buffer ) {
 
       val offset = frame.headers.head._1.offset;
       val buffer1 = frame.headers.head._1;
-      val buffer2 = frame.content;
+      val buffer2 = frame.content.asInstanceOf[BufferStompContent].content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
       os.write( buffer1.data, offset, length)                                           
                                                
 
@@ -152,10 +151,13 @@ class StompWireFormat extends WireFormat
   var write_channel:WritableByteChannel = null
 
   var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
+  var next_write_direct:ByteBuffer = null
+
   var write_buffer = ByteBuffer.allocate(0)
+  var write_direct:ByteBuffer = null
 
-  def is_full = next_write_buffer.size() >= (write_buffer_size >> 2)
-  def is_empty = write_buffer.remaining() == 0
+  def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size
>> 2)
+  def is_empty = write_buffer.remaining() == 0 && write_direct==null
 
   def setWritableByteChannel(channel: WritableByteChannel) = {
     this.write_channel = channel
@@ -172,7 +174,7 @@ class StompWireFormat extends WireFormat
       WireFormat.BufferState.FULL
     } else {
       val was_empty = is_empty
-      marshal(command, next_write_buffer);
+      marshalX(command.asInstanceOf[StompFrame], next_write_buffer);
       if( was_empty ) {
         WireFormat.BufferState.WAS_EMPTY
       } else {
@@ -181,19 +183,70 @@ class StompWireFormat extends WireFormat
     }
   }
 
-  def flush():WireFormat.BufferState = {
-    
+  def marshalX(frame:StompFrame, os:DataOutput) = {
+    frame.action.writeTo(os)
+    os.write(NEWLINE)
+
+    // Write any updated headers first...
+    if( !frame.updated_headers.isEmpty ) {
+      for( (key, value) <- frame.updated_headers ) {
+        key.writeTo(os)
+        os.write(SEPERATOR)
+        value.writeTo(os)
+        os.write(NEWLINE)
+      }
+    }
+
+    // we can optimize a little if the headers and content are in the same buffer..
+    if( frame.are_headers_in_content_buffer ) {
+
+      val offset = frame.headers.head._1.offset;
+      val buffer1 = frame.headers.head._1;
+      val buffer2 = frame.content.asInstanceOf[BufferStompContent].content;
+      val length = (buffer2.offset-buffer1.offset)+buffer2.length
+      os.write( buffer1.data, offset, length)
+
+    } else {
+      for( (key, value) <- frame.headers ) {
+        key.writeTo(os)
+        os.write(SEPERATOR)
+        value.writeTo(os)
+        os.write(NEWLINE)
+      }
+      os.write(NEWLINE)
+
+      frame.content match {
+        case x:DirectStompContent=>
+          next_write_direct = x.direct.buffer.duplicate
+          next_write_direct.limit(next_write_direct.limit-1)
+        case x:BufferStompContent=>
+          x.content.writeTo(os)
+        case _=>
+      }
+
+    }
+    END_OF_FRAME_BUFFER.writeTo(os)
+  }
+
+
+  def flush():BufferState = {
+
     // if we have a pending write that is being sent over the socket...
-    if ( !is_empty ) {
-        write_counter += write_channel.write(write_buffer)
+    if ( write_buffer.remaining() != 0 ) {
+      write_counter += write_channel.write(write_buffer)
+    }
+    if ( write_buffer.remaining() == 0 && write_direct!=null ) {
+      write_counter += write_channel.write(write_direct)
     }
 
     // if it is now empty try to refill...
     if ( is_empty && next_write_buffer.size()!=0 ) {
         // size of next buffer is based on how much was used in the previous buffer.
-        val prev_size = Math.min(Math.max(write_buffer.position()+512, 512), write_buffer_size)
+        val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
         write_buffer = next_write_buffer.toBuffer().toByteBuffer()
+        write_direct = next_write_direct
         next_write_buffer = new DataByteArrayOutputStream(prev_size)
+        next_write_direct = null
     }
 
     if ( is_empty ) {
@@ -336,6 +389,7 @@ class StompWireFormat extends WireFormat
   }
 
   def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader
= (buffer)=> {
+    var rc:StompFrame = null
     val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
     if( line !=null ) {
       if( line.trim().length() > 0 ) {
@@ -379,14 +433,51 @@ class StompWireFormat extends WireFormat
           if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
               throw new IOException("The maximum data length was exceeded")
           }
-          next_action = read_binary_body(action, headers, length)
+
+          if( length > 1024 && memory_pool!=null ) {
+
+            val ma = memory_pool.alloc(length+1)
+
+            val read_limit = buffer.position
+            if( (read_limit-read_start) < length+1 ) {
+              // buffer did not contain the fully stomp body
+
+              ma.buffer.put( buffer.array, read_start, read_limit-read_start )
+
+              read_buffer = ma.buffer
+              read_end = read_limit-read_start
+              read_start = 0
+
+              next_action = read_binary_body_direct(action, headers, ma)
+
+            } else {
+              // The current buffer already read in all the data...
+
+              if( buffer.array()(read_start+length)!= 0 ) {
+                 throw new IOException("Expected null termintor after "+length+" content
bytes")
+              }
+
+              // copy the body out to the direct buffer
+              ma.buffer.put( buffer.array, read_start, read_limit-read_start )
+
+              // and reposition to reuse non-direct space.
+              buffer.position(read_start)
+              read_end = read_start
+
+              next_action = read_action
+              rc = new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
+            }
+
+          } else {
+            next_action = read_binary_body(action, headers, length)
+          }
 
         } else {
           next_action = read_text_body(action, headers)
         }
       }
     }
-    null
+    rc
   }
 
   def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -401,17 +492,45 @@ class StompWireFormat extends WireFormat
   }
 
 
+  def read_binary_body_direct(action:Buffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader
= (buffer)=> {
+    if( read_content_direct(ma) ) {
+      next_action = read_action
+      new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
+    } else {
+      null
+    }
+  }
+
+  def read_content_direct(ma:MemoryAllocation) = {
+      val read_limit = ma.buffer.position
+      if( read_limit < ma.size ) {
+        read_end = read_limit
+        false
+      } else {
+        ma.buffer.position(ma.size-1)
+        if( ma.buffer.get != 0 ) {
+           throw new IOException("Expected null termintor after "+(ma.size-1)+" content bytes")
+        }
+        ma.buffer.rewind
+        ma.buffer.limit(ma.size-1)
+
+        read_buffer = ByteBuffer.allocate(read_buffer_size)
+        read_end = 0
+        read_start = 0
+        true
+      }
+  }
+
   def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader
= (buffer)=> {
     val content:Buffer=read_content(buffer, contentLength)
     if( content != null ) {
       next_action = read_action
-      new StompFrame(ascii(action), headers.toList, content)
+      new StompFrame(ascii(action), headers.toList, BufferStompContent(content))
     } else {
       null
     }
   }
 
-
   def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
       val read_limit = buffer.position
       if( (read_limit-read_start) < contentLength+1 ) {
@@ -419,7 +538,7 @@ class StompWireFormat extends WireFormat
         null
       } else {
         if( buffer.array()(read_start+contentLength)!= 0 ) {
-           throw new IOException("Exected null termintor after "+contentLength+" content
bytes")
+           throw new IOException("Expected null termintor after "+contentLength+" content
bytes")
         }
         var rc = new Buffer(buffer.array, read_start, contentLength)
         read_end = read_start+contentLength+1
@@ -447,7 +566,7 @@ class StompWireFormat extends WireFormat
     val content:Buffer=read_to_null(buffer)
     if( content != null ) {
       next_action = read_action
-      new StompFrame(ascii(action), headers.toList, content)
+      new StompFrame(ascii(action), headers.toList, BufferStompContent(content))
     } else {
       null
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961202&r1=961201&r2=961202&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Wed Jul  7 04:18:52 2010
@@ -166,7 +166,7 @@ class StompRemoteProducer extends Remote
     //    }
 
     var content = ascii(createPayload());
-    frame = StompFrame(Stomp.Commands.SEND, headers, content)
+    frame = StompFrame(Stomp.Commands.SEND, headers, BufferStompContent(content))
     drain()
   }
 



Mime
View raw message