Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6C1A6914 for ; Sun, 17 Jul 2011 14:43:41 +0000 (UTC) Received: (qmail 91128 invoked by uid 500); 17 Jul 2011 14:43:41 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 91097 invoked by uid 500); 17 Jul 2011 14:43:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 91090 invoked by uid 99); 17 Jul 2011 14:43:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Jul 2011 14:43:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Jul 2011 14:43:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E07EE2388894 for ; Sun, 17 Jul 2011 14:43:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1147639 - in /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp: StompCodec.scala StompFrame.scala StompProtocolHandler.scala Date: Sun, 17 Jul 2011 14:43:16 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110717144316.E07EE2388894@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Sun Jul 17 14:43:16 2011 New Revision: 1147639 URL: http://svn.apache.org/viewvc?rev=1147639&view=rev Log: Optimized the stomp codec flush call a bit. Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1147639&r1=1147638&r2=1147639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Sun Jul 17 14:43:16 2011 @@ -31,6 +31,7 @@ import _root_.org.fusesource.hawtbuf._ import Buffer._ import org.apache.activemq.apollo.util._ import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord} +import org.apache.activemq.apollo.util.Log._ object StompCodec extends Log { @@ -252,43 +253,46 @@ class StompCodec extends ProtocolCodec { def flush():ProtocolCodec.BufferState = { + while(true) { + // if we have a pending write that is being sent over the socket... + if ( write_buffer.remaining() != 0 ) { + last_write_io_size = write_channel.write(write_buffer) + if ( last_write_io_size==0 ) + return ProtocolCodec.BufferState.NOT_EMPTY + else + write_counter += last_write_io_size + } else { + if ( write_direct!=null ) { + last_write_io_size = write_direct.read(write_direct_pos, write_channel) + if ( last_write_io_size==0 ) + return ProtocolCodec.BufferState.NOT_EMPTY + else { + write_direct_pos += last_write_io_size + write_counter += last_write_io_size + + if( write_direct.remaining(write_direct_pos) == 0 ) { + write_direct.release + write_direct = null + write_direct_pos = 0 + write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data) + } + } + } else { + if( next_write_buffer.size()==0 ) { + return ProtocolCodec.BufferState.EMPTY + } else { + // size of next buffer is based on how much was used in the previous buffer. + val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size) + write_buffer = next_write_buffer.toBuffer().toByteBuffer() + write_direct = next_write_direct - // if we have a pending write that is being sent over the socket... - if ( write_buffer.remaining() != 0 ) { - last_write_io_size = write_channel.write(write_buffer) - write_counter += last_write_io_size - } - if ( write_buffer.remaining() == 0 && write_direct!=null ) { - val count = write_direct.read(write_direct_pos, write_channel) - write_direct_pos += count - write_counter += count - - if( write_direct.remaining(write_direct_pos) == 0 ) { - write_direct.release - write_direct = null - write_direct_pos = 0 - - write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data) + next_write_buffer = new DataByteArrayOutputStream(prev_size) + next_write_direct = null + } + } } } - - // if it is now empty try to refill... - if ( is_empty && write_direct==null ) { - // size of next buffer is based on how much was used in the previous buffer. - val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size) - write_buffer = next_write_buffer.toBuffer().toByteBuffer() - write_direct = next_write_direct - - next_write_buffer = new DataByteArrayOutputStream(prev_size) - next_write_direct = null - } - - if ( is_empty ) { - ProtocolCodec.BufferState.EMPTY - } else { - ProtocolCodec.BufferState.NOT_EMPTY - } - + ProtocolCodec.BufferState.NOT_EMPTY } @@ -339,13 +343,15 @@ class StompCodec extends ProtocolCodec { while( command==null ) { // do we need to read in more data??? if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) { - val count = read_direct.write(read_channel, read_direct_pos) - if (count == -1) { + last_read_io_size = read_direct.write(read_channel, read_direct_pos) + + if (last_read_io_size == -1) { throw new EOFException("Peer disconnected") - } else if (count == 0) { + } else if (last_read_io_size == 0) { return null } - read_direct_pos += count + read_direct_pos += last_read_io_size + read_counter += last_read_io_size } else if (read_end == read_buffer.position() ) { // do we need a new data buffer to read data into?? Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1147639&r1=1147638&r2=1147639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Sun Jul 17 14:43:16 2011 @@ -179,6 +179,7 @@ object NilContent extends StompContent { def length = 0 def writeTo(os:OutputStream) = {} val utf8 = new UTF8Buffer("") + override def toString = "NilContent" } /** Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1147639&r1=1147638&r2=1147639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Jul 17 14:43:16 2011 @@ -247,7 +247,7 @@ class StompProtocolHandler extends Proto def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = { - // session acks ack all previously recieved messages.. + // session acks ack all previously received messages.. var found = false val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=> if( found ) { @@ -261,7 +261,7 @@ class StompProtocolHandler extends Proto } if( acked.isEmpty ) { -// println("ACK failed, invalid message id: %s".format(msgid)) + println("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(","))) } else { consumer_acks = not_acked acked.foreach{case (id, delivery)=> @@ -661,7 +661,7 @@ class StompProtocolHandler extends Proto case DISCONNECT => - val delay = send_receipt(frame.headers) + val delay = send_receipt(frame.headers)!=null on_transport_disconnected if( delay ) { queue.after(die_delay, TimeUnit.MILLISECONDS) { @@ -1218,15 +1218,16 @@ class StompProtocolHandler extends Proto } - def send_receipt(headers:HeaderMap):Boolean = { + def send_receipt(headers:HeaderMap) = { get(headers, RECEIPT_REQUESTED) match { case Some(receipt)=> + val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt))) dispatchQueue <<| ^{ - connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))) + connection_sink.offer(frame) } - true + frame case None=> - false + null } }