Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 202A79A71 for ; Fri, 17 Feb 2012 14:43:11 +0000 (UTC) Received: (qmail 56890 invoked by uid 500); 17 Feb 2012 14:43:11 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56821 invoked by uid 500); 17 Feb 2012 14:43:11 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56811 invoked by uid 99); 17 Feb 2012 14:43:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 14:43:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 14:43:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DF6FC2388900 for ; Fri, 17 Feb 2012 14:42:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245580 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Date: Fri, 17 Feb 2012 14:42:46 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120217144246.DF6FC2388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 17 14:42:46 2012 New Revision: 1245580 URL: http://svn.apache.org/viewvc?rev=1245580&view=rev Log: Revert "Fix for APLO-160 Apollo becoming unresponsive when stressed with 48k connections." This change had a negative performance impact, instead we should look into using the strategy outlined in APLO-163 to deal with large connection counts. Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1245580&r1=1245579&r2=1245580&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Feb 17 14:42:46 2012 @@ -19,17 +19,19 @@ package org.apache.activemq.apollo.stomp import _root_.org.apache.activemq.apollo.broker._ import java.nio.ByteBuffer +import collection.mutable.{ListBuffer, HashMap} import Stomp._ import BufferConversions._ import _root_.scala.collection.JavaConversions._ -import java.io.{EOFException, DataOutput, IOException} +import java.io.{EOFException, DataOutput, DataInput, IOException} import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel} import org.fusesource.hawtdispatch.transport._ import _root_.org.fusesource.hawtbuf._ import Buffer._ import org.apache.activemq.apollo.util._ import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord} +import org.apache.activemq.apollo.util.Log._ object StompCodec extends Log { @@ -160,8 +162,7 @@ class StompCodec extends ProtocolCodec { var write_counter = 0L var write_channel:WritableByteChannel = null - var next_write_buffer_size = write_buffer_size - var next_write_buffer:DataByteArrayOutputStream = null + var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size) var next_write_direct:DirectBuffer = null var write_buffer = ByteBuffer.allocate(0) @@ -169,7 +170,7 @@ class StompCodec extends ProtocolCodec { var write_direct_pos = 0 var last_write_io_size = 0 - def full = next_write_direct!=null || ( next_write_buffer!=null && next_write_buffer.size >= (write_buffer_size >> 1)) + def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >> 1) def is_empty = write_buffer.remaining == 0 && write_direct==null def setWritableByteChannel(channel: WritableByteChannel) = { @@ -187,9 +188,6 @@ class StompCodec extends ProtocolCodec { if ( full) { ProtocolCodec.BufferState.FULL } else { - if(next_write_buffer==null) { - next_write_buffer = new DataByteArrayOutputStream(next_write_buffer_size) - } val was_empty = is_empty command match { case buffer:Buffer=> @@ -278,14 +276,15 @@ class StompCodec extends ProtocolCodec { } } } else { - if( next_write_buffer == null || next_write_buffer.size()==0 ) { + if( next_write_buffer.size()==0 ) { return ProtocolCodec.BufferState.EMPTY } else { // size of next buffer is based on how much was used in the previous buffer. - next_write_buffer_size = (write_buffer.position()+512).max(512).min(write_buffer_size) + val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size) write_buffer = next_write_buffer.toBuffer().toByteBuffer() write_direct = next_write_direct - next_write_buffer = null + + next_write_buffer = new DataByteArrayOutputStream(prev_size) next_write_direct = null } } @@ -307,7 +306,7 @@ class StompCodec extends ProtocolCodec { var read_buffer_size = 1024*64 var read_channel:ReadableByteChannel = null - var read_buffer:ByteBuffer = null + var read_buffer = ByteBuffer.allocate(read_buffer_size) var read_end = 0 var read_start = 0 @@ -328,8 +327,7 @@ class StompCodec extends ProtocolCodec { def unread(buffer: Array[Byte]) = { assert(read_counter == 0) - read_buffer = ByteBuffer.wrap(buffer) - read_buffer.position(read_buffer.capacity()) + read_buffer.put(buffer) read_counter += buffer.length } @@ -338,9 +336,7 @@ class StompCodec extends ProtocolCodec { def getLastReadSize = last_read_io_size override def read():Object = { - if( read_buffer == null ) { - read_buffer = ByteBuffer.allocate(read_buffer_size) - } + var command:Object = null while( command==null ) { // do we need to read in more data??? @@ -390,11 +386,6 @@ class StompCodec extends ProtocolCodec { if (last_read_io_size == -1) { throw new EOFException("Peer disconnected") } else if (last_read_io_size == 0) { - if( read_start == read_buffer.position() ) { - read_start = 0 - read_end = 0 - read_buffer = null - } return null } read_counter += last_read_io_size