activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r897939 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: nio/SelectorManager.java nio/SelectorWorker.java stomp/StompNIOTransport.java
Date Mon, 11 Jan 2010 16:54:50 GMT
Author: dejanb
Date: Mon Jan 11 16:54:49 2010
New Revision: 897939

URL: http://svn.apache.org/viewvc?rev=897939&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2440 - first shot at refactoring stomp+nio -
handles load much better than previous solution, but seems to still leak threads - future
improvements expected

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=897939&r1=897938&r2=897939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Mon Jan 11 16:54:49 2010
@@ -79,6 +79,11 @@
 
     public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
         freeWorkers.remove(worker);
+        try {
+            // no more connections on worker, close it
+            worker.close();
+        } catch (IOException e) {
+        }
     }
 
     public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=897939&r1=897938&r2=897939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
Mon Jan 11 16:54:49 2010
@@ -117,7 +117,7 @@
             }
         } catch (IOException e) {
 
-            // Don't accept any more slections
+            // Don't accept any more selections
             manager.onWorkerEmptyEvent(this);
 
             // Notify all the selections that the error occurred.
@@ -147,4 +147,7 @@
 	    selectorLock.readLock().unlock();
 	}
 	
+    public void close() throws IOException {
+        selector.close();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=897939&r1=897938&r2=897939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Mon Jan 11 16:54:49 2010
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
@@ -30,11 +32,13 @@
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.nio.NIOBufferedInputStream;
+import org.apache.activemq.transport.nio.NIOInputStream;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
@@ -48,6 +52,10 @@
 
     private SocketChannel channel;
     private SelectorSelection selection;
+    
+    private ByteBuffer inputBuffer;
+    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+    int previousByte = -1;
 
     public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -76,18 +84,54 @@
             }
         });
 
+        inputBuffer = ByteBuffer.allocate(8 * 1024);
         this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
     }
-
+    
     private void serviceRead() {
         try {
-            DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel,
8 * 1024));
-            while (true) {
-                Object command = wireFormat.unmarshal(in);
-                doConsume((Command)command);
-            }
-
+            
+           while (true) {
+               // read channel
+               int readSize = channel.read(inputBuffer);
+               // channel is closed, cleanup
+               if (readSize == -1) {
+                   onException(new EOFException());
+                   selection.close();
+                   break;
+               }
+               // nothing more to read, break
+               if (readSize == 0) {
+                   break;
+               }
+               
+               inputBuffer.flip();
+               
+               int b;
+               ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
+               
+               int i = 0;
+               while(i++ < readSize) {
+                   b = input.read();
+                   // skip repeating nulls
+                   if (previousByte == 0 && b == 0) {
+                       continue;
+                   }
+                   currentCommand.write(b);
+                   // end of command reached, unmarshal
+                   if (b == 0) {
+                       Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
+                       doConsume((Command)command);
+                       currentCommand.reset();
+                   }
+                   previousByte = b;
+               }
+               // clear the buffer
+               inputBuffer.clear();
+               
+           }
         } catch (IOException e) {
+            selection.close();
             onException(e);
         } catch (Throwable e) {
             onException(IOExceptionSupport.create(e));



Mime
View raw message