activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1331565 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Date Fri, 27 Apr 2012 20:08:32 GMT
Author: chirino
Date: Fri Apr 27 20:08:31 2012
New Revision: 1331565

URL: http://svn.apache.org/viewvc?rev=1331565&view=rev
Log:
Use smarter buffer pooling in the StompCodec so that idle connections don't hog memory resources.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1331565&r1=1331564&r2=1331565&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Fri Apr 27 20:08:31 2012
@@ -19,19 +19,51 @@ package org.apache.activemq.apollo.stomp
 import _root_.org.apache.activemq.apollo.broker._
 
 import java.nio.ByteBuffer
-import collection.mutable.{ListBuffer, HashMap}
 import Stomp._
 
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
-import java.io.{EOFException, DataOutput, DataInput, IOException}
+import java.io.{EOFException, DataOutput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import org.fusesource.hawtdispatch.transport._
 import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
-import org.apache.activemq.apollo.util.Log._
+import java.lang.ThreadLocal
+import java.util.{Arrays, ArrayList}
+import collection.mutable.HashMap
+
+object BufferPool {
+
+  final val buffer_count_per_thread = 100
+  final val queues = new ThreadLocal[HashMap[Int, ArrayList[Array[Byte]]]]()
+
+  def queue(size:Int) = {
+    var sizes = queues.get()
+    if( sizes == null ) {
+      sizes = HashMap()
+      queues.set(sizes)
+    }
+    sizes.getOrElseUpdate(size, new ArrayList[Array[Byte]](buffer_count_per_thread))
+  }
+
+  def checkout(size:Int):Array[Byte] = {
+    val q = queue(size)
+    if( q.isEmpty ) {
+      new Array[Byte](size)
+    } else {
+      q.remove(q.size()-1)
+    }
+  }
+
+  def checkin(buffer:Array[Byte]) = {
+    val q = queue(buffer.length)
+    if( q.size() < buffer_count_per_thread ) {
+      q.add(buffer)
+    }
+  }
+}
 
 object StompCodec extends Log {
 
@@ -173,7 +205,6 @@ class StompCodec extends ProtocolCodec w
         }
 
     }
-    next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
     read_buffer = ByteBuffer.allocate(read_buffer_size)
   }
   
@@ -190,13 +221,13 @@ class StompCodec extends ProtocolCodec w
   var next_write_buffer : DataByteArrayOutputStream = _
   var next_write_direct:DirectBuffer = null
 
-  var write_buffer = ByteBuffer.allocate(0)
+  var write_buffer:ByteBuffer = null
   var write_direct:DirectBuffer = null
   var write_direct_pos = 0
   var last_write_io_size = 0
 
-  def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >>
1)
-  def is_empty = write_buffer.remaining == 0 && write_direct==null
+  def full = next_write_direct!=null || (next_write_buffer!=null && next_write_buffer.size
>= (write_buffer_size >> 1))
+  def is_empty = write_buffer==null && write_direct==null
 
   def setWritableByteChannel(channel: WritableByteChannel) = {
     this.write_channel = channel
@@ -211,12 +242,27 @@ class StompCodec extends ProtocolCodec w
       ProtocolCodec.BufferState.FULL
     } else {
       val was_empty = is_empty
+      if(next_write_buffer==null) {
+        next_write_buffer = new DataByteArrayOutputStream(BufferPool.checkout(write_buffer_size))
{
+          // Checks the buffer back in if we re-size..
+          override def resize(newcount: Int)  = {
+            val oldbuf = buf
+            super.resize(newcount)
+            if( oldbuf.length == write_buffer_size) {
+              BufferPool.checkin(oldbuf)
+            }
+          }
+        }
+      }
       command match {
         case buffer:Buffer=>
           buffer.writeTo(next_write_buffer.asInstanceOf[DataOutput])        
         case frame:StompFrame=>
           encode(frame, next_write_buffer);
       }
+      if (next_write_buffer.size() >= (write_buffer_size*0.75) ) {
+          flush();
+      }
       if( was_empty ) {
         ProtocolCodec.BufferState.WAS_EMPTY
       } else {
@@ -275,12 +321,16 @@ class StompCodec extends ProtocolCodec w
   def flush():ProtocolCodec.BufferState = {
     while(true) {
       // if we have a pending write that is being sent over the socket...
-      if ( write_buffer.remaining() != 0 ) {
+      if ( write_buffer != null ) {
         last_write_io_size = write_channel.write(write_buffer)
-        if ( last_write_io_size==0 )
+        if ( last_write_io_size==0 ) {
           return ProtocolCodec.BufferState.NOT_EMPTY
-        else
+        } else {
           write_counter += last_write_io_size
+          if( write_buffer.remaining() == 0 ) {
+            write_buffer = null
+          }
+        }
       } else {
         if ( write_direct!=null ) {
           last_write_io_size = write_direct.read(write_direct_pos, write_channel)
@@ -298,15 +348,14 @@ class StompCodec extends ProtocolCodec w
             }
           }
         } else {
-          if( next_write_buffer.size()==0 ) {
+          if( next_write_buffer==null ) {
             return ProtocolCodec.BufferState.EMPTY
           } else {
             // size of next buffer is based on how much was used in the previous buffer.
-            val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
             write_buffer = next_write_buffer.toBuffer().toByteBuffer()
             write_direct = next_write_direct
 
-            next_write_buffer = new DataByteArrayOutputStream(prev_size)
+            next_write_buffer = null
             next_write_direct = null
           }
         }
@@ -346,6 +395,7 @@ class StompCodec extends ProtocolCodec w
 
   def unread(buffer: Array[Byte]) = {
     assert(read_counter == 0)
+    read_buffer = ByteBuffer.allocate(buffer.length)
     read_buffer.put(buffer)
     read_counter += buffer.length
   }
@@ -369,13 +419,13 @@ class StompCodec extends ProtocolCodec w
         }
         read_direct_pos += last_read_io_size
         read_counter += last_read_io_size
-      } else if (read_end == read_buffer.position() ) {
+      } else if (read_buffer==null || read_end == read_buffer.position() ) {
 
           // do we need a new data buffer to read data into??
-          if (read_buffer.remaining() == 0) {
+          if (read_buffer==null || read_buffer.remaining() == 0) {
 
               // How much data is still not consumed by the wireformat
-              var size = read_end - read_start
+              val size = read_end - read_start
 
               var new_capacity = if(read_start == 0) {
                 size+read_buffer_size
@@ -387,13 +437,15 @@ class StompCodec extends ProtocolCodec w
                 }
               }
 
-              var new_buffer = new Array[Byte](new_capacity)
-
-              if (size > 0) {
-                  System.arraycopy(read_buffer.array(), read_start, new_buffer, 0, size)
-              }
-
-              read_buffer = ByteBuffer.wrap(new_buffer)
+              read_buffer = ByteBuffer.wrap(if (size > 0) {
+                val rc = Arrays.copyOfRange(read_buffer.array(), read_start, read_start+new_capacity)
+                rc
+              } else {
+                if (new_capacity == read_buffer_size)
+                  BufferPool.checkout(read_buffer_size)
+                else
+                  new Array[Byte](new_capacity)
+              })
               read_buffer.position(size)
               read_start = 0
               read_end = size
@@ -405,6 +457,15 @@ class StompCodec extends ProtocolCodec w
           if (last_read_io_size == -1) {
               throw new EOFException("Peer disconnected")
           } else if (last_read_io_size == 0) {
+              if( read_start == read_end ) {
+                if( read_end==0 && read_buffer.array().length ==  read_buffer_size
){
+                  BufferPool.checkin(read_buffer.array())
+                } else {
+                  read_start = 0
+                  read_end = 0
+                }
+                read_buffer = null
+              }
               return null
           }
           read_counter += last_read_io_size



Mime
View raw message