activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r813722 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Date Fri, 11 Sep 2009 08:47:12 GMT
Author: dejanb
Date: Fri Sep 11 08:47:11 2009
New Revision: 813722

URL: http://svn.apache.org/viewvc?rev=813722&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2386 - stomp+nio using selectors

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=813722&r1=813721&r2=813722&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Fri Sep 11 08:47:11 2009
@@ -22,14 +22,21 @@
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
+import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.nio.NIOBufferedInputStream;
 import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
@@ -40,6 +47,7 @@
 public class StompNIOTransport extends TcpTransport {
 
     private SocketChannel channel;
+    private SelectorSelection selection;
 
     public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -53,8 +61,47 @@
         channel = socket.getChannel();
         channel.configureBlocking(false);
 
-        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
-        this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
+        // listen for events telling us when the socket is readable.
+        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener()
{
+            public void onSelect(SelectorSelection selection) {
+                serviceRead();
+            }
+
+            public void onError(SelectorSelection selection, Throwable error) {
+                if (error instanceof IOException) {
+                    onException((IOException)error);
+                } else {
+                    onException(IOExceptionSupport.create(error));
+                }
+            }
+        });
+
+        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
+    }
+
+    private void serviceRead() {
+        try {
+            DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel,
8 * 1024));
+            while (true) {
+                Object command = wireFormat.unmarshal(in);
+                doConsume((Command)command);
+            }
+
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+            onException(IOExceptionSupport.create(e));
+        }
     }
 
+    protected void doStart() throws Exception {
+        connect();
+        selection.setInterestOps(SelectionKey.OP_READ);
+        selection.enable();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        selection.disable();
+        super.doStop(stopper);
+    }
 }



Mime
View raw message