activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1331491 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Date Fri, 27 Apr 2012 16:25:33 GMT
Author: chirino
Date: Fri Apr 27 16:25:32 2012
New Revision: 1331491

URL: http://svn.apache.org/viewvc?rev=1331491&view=rev
Log:
Don't initialize the read/write buffers until we know what size to make them.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1331491&r1=1331490&r2=1331491&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
Fri Apr 27 16:25:32 2012
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtbuf.Buffer
 import org.apache.activemq.apollo.broker.store.MessageRecord
-import org.fusesource.hawtdispatch.transport.ProtocolCodec
 import java.nio.channels.{WritableByteChannel, ReadableByteChannel}
 import java.nio.ByteBuffer
 import java.io.IOException
@@ -28,6 +27,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util.OptionSupport
 import org.apache.activemq.apollo.broker.{Message, ProtocolException}
 import org.apache.activemq.apollo.dto.{DetectDTO, AcceptingConnectorDTO}
+import transport.{Transport, TransportAware, ProtocolCodec}
 
 /**
  * <p>
@@ -89,7 +89,7 @@ class AnyProtocol(val func: ()=>Array[Pr
 
 case class ProtocolDetected(id:String, codec:ProtocolCodec)
 
-class AnyProtocolCodec(val protocols: Array[Protocol]) extends ProtocolCodec {
+class AnyProtocolCodec(val protocols: Array[Protocol]) extends ProtocolCodec with TransportAware
{
 
   if (protocols.isEmpty) {
     throw new IllegalArgumentException("No protocol configured for identification.")
@@ -99,6 +99,9 @@ class AnyProtocolCodec(val protocols: Ar
 
   def setReadableByteChannel(channel: ReadableByteChannel) = {this.channel = channel}
 
+  var transport:Transport = _
+  def setTransport(t: Transport) = transport = t
+
   def read: AnyRef = {
     if (channel == null) {
       throw new IllegalStateException
@@ -109,6 +112,7 @@ class AnyProtocolCodec(val protocols: Ar
     protocols.foreach {protocol =>
       if (protocol.matchesIdentification(buff)) {
         val protocolCodec = protocol.createProtocolCodec()
+        transport.setProtocolCodec(protocolCodec)
         protocolCodec.unread(buff.toByteArray)
         return ProtocolDetected(protocol.id, protocolCodec)
       }
@@ -176,7 +180,6 @@ class AnyProtocolHandler extends Protoco
 
      // replace the current handler with the new one.
     connection.protocol_handler = protocol_handler
-    connection.transport.setProtocolCodec(protocol.codec)
     connection.transport.suspendRead
 
     protocol_handler.set_connection(connection);

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1331491&r1=1331490&r2=1331491&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Fri Apr 27 16:25:32 2012
@@ -150,28 +150,6 @@ class StompCodec extends ProtocolCodec w
 
   def protocol() = "stomp"
 
-  
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Non blocking write imp
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  var write_buffer_size = 1024*64;
-  var write_counter = 0L
-  var write_channel:WritableByteChannel = null
-
-  var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
-  var next_write_direct:DirectBuffer = null
-
-  var write_buffer = ByteBuffer.allocate(0)
-  var write_direct:DirectBuffer = null
-  var write_direct_pos = 0
-  var last_write_io_size = 0
-
-  def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >>
1)
-  def is_empty = write_buffer.remaining == 0 && write_direct==null
-
   def setTransport(transport:Transport) {
     transport match {
       case tcp:TcpTransport=>
@@ -195,7 +173,30 @@ class StompCodec extends ProtocolCodec w
         }
 
     }
+    next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
+    read_buffer = ByteBuffer.allocate(read_buffer_size)
   }
+  
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Non blocking write imp
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  var write_buffer_size = 1024*64;
+  var write_counter = 0L
+  var write_channel:WritableByteChannel = null
+
+  var next_write_buffer : DataByteArrayOutputStream = _
+  var next_write_direct:DirectBuffer = null
+
+  var write_buffer = ByteBuffer.allocate(0)
+  var write_direct:DirectBuffer = null
+  var write_direct_pos = 0
+  var last_write_io_size = 0
+
+  def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >>
1)
+  def is_empty = write_buffer.remaining == 0 && write_direct==null
 
   def setWritableByteChannel(channel: WritableByteChannel) = {
     this.write_channel = channel
@@ -327,7 +328,7 @@ class StompCodec extends ProtocolCodec w
   var read_buffer_size = 1024*64
   var read_channel:ReadableByteChannel = null
 
-  var read_buffer = ByteBuffer.allocate(read_buffer_size)
+  var read_buffer:ByteBuffer = _
   var read_end = 0
   var read_start = 0
 



Mime
View raw message