Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 73197 invoked from network); 11 Sep 2009 08:47:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Sep 2009 08:47:38 -0000 Received: (qmail 10749 invoked by uid 500); 11 Sep 2009 08:47:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 10714 invoked by uid 500); 11 Sep 2009 08:47:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 10702 invoked by uid 99); 11 Sep 2009 08:47:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2009 08:47:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2009 08:47:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6FE6123888AD; Fri, 11 Sep 2009 08:47:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r813722 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Date: Fri, 11 Sep 2009 08:47:12 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090911084712.6FE6123888AD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Fri Sep 11 08:47:11 2009 New Revision: 813722 URL: http://svn.apache.org/viewvc?rev=813722&view=rev Log: https://issues.apache.org/activemq/browse/AMQ-2386 - stomp+nio using selectors Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=813722&r1=813721&r2=813722&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Fri Sep 11 08:47:11 2009 @@ -22,14 +22,21 @@ import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import javax.net.SocketFactory; +import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.nio.NIOBufferedInputStream; import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; /** @@ -40,6 +47,7 @@ public class StompNIOTransport extends TcpTransport { private SocketChannel channel; + private SelectorSelection selection; public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -53,8 +61,47 @@ channel = socket.getChannel(); channel.configureBlocking(false); - this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024)); - this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); + // listen for events telling us when the socket is readable. + selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { + public void onSelect(SelectorSelection selection) { + serviceRead(); + } + + public void onError(SelectorSelection selection, Throwable error) { + if (error instanceof IOException) { + onException((IOException)error); + } else { + onException(IOExceptionSupport.create(error)); + } + } + }); + + this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024)); + } + + private void serviceRead() { + try { + DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); + while (true) { + Object command = wireFormat.unmarshal(in); + doConsume((Command)command); + } + + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } } + protected void doStart() throws Exception { + connect(); + selection.setInterestOps(SelectionKey.OP_READ); + selection.enable(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + selection.disable(); + super.doStop(stopper); + } }