activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r493241 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/ main/resources/META-INF/services/org/apache/activemq/transport/ test/java/org/apache/activemq/transport/nio/
Date Fri, 05 Jan 2007 23:23:15 GMT
Author: chirino
Date: Fri Jan  5 15:23:14 2007
New Revision: 493241

URL: http://svn.apache.org/viewvc?view=rev&rev=493241
Log:
Adding an intial cut of an NIO based Transport.  This could be really hand to help brokers
scale up in situations where it needs to accecpt a large number of connections.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+/**
+ * An optimized buffered input stream for Tcp
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class NIOInputStream extends InputStream {
+	
+    protected int count;
+    protected int position;
+	private final ByteBuffer in;
+
+    public NIOInputStream(ByteBuffer in){
+		this.in = in;
+    }
+
+    public int read() throws IOException {
+    	try {
+    		int rc = in.get()& 0xff; 
+    		return rc;
+    	} catch ( BufferUnderflowException e ) {
+    		return -1;
+    	}
+    }
+
+    public int read(byte b[],int off,int len) throws IOException{
+    	if( in.hasRemaining() ) {
+	    	int rc = Math.min(len, in.remaining());
+	    	in.get(b, off, rc);
+	    	return rc;
+    	} else {
+    		return len == 0 ? 0 : -1; 
+    	}
+    }
+
+    public long skip(long n) throws IOException{
+    	int rc = Math.min((int)n, in.remaining());
+    	in.position(in.position()+rc);
+        return rc;
+    }
+
+    public int available() throws IOException{
+        return in.remaining();
+    }
+
+    public boolean markSupported(){
+        return false;
+    }
+
+    public void close() throws IOException{
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,182 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+
+public class NIOOutputStream extends OutputStream {
+	
+	private final static int BUFFER_SIZE = 8192;
+
+	private final WritableByteChannel out;
+    private final byte[] buffer;
+	private final ByteBuffer byteBuffer;
+    
+    private int count;
+    private boolean closed;
+
+    /**
+     * Constructor
+     *
+     * @param out
+     */
+    public NIOOutputStream(WritableByteChannel out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified underlying output
stream with the specified
+     * buffer size.
+     *
+     * @param out  the underlying output stream.
+     * @param size the buffer size.
+     * @throws IllegalArgumentException if size <= 0.
+     */
+    public NIOOutputStream(WritableByteChannel out, int size) {
+        this.out = out;
+		if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        byteBuffer = ByteBuffer.wrap(buffer);
+    }
+
+    /**
+     * write a byte on to the stream
+     *
+     * @param b - byte to write
+     * @throws IOException
+     */
+    public void write(int b) throws IOException {
+        checkClosed();
+        if (availableBufferToWrite() < 1) {
+            flush();
+        }
+        buffer[count++] = (byte) b;
+    }
+
+
+    /**
+     * write a byte array to the stream
+     *
+     * @param b   the byte buffer
+     * @param off the offset into the buffer
+     * @param len the length of data to write
+     * @throws IOException
+     */
+    public void write(byte b[], int off, int len) throws IOException {
+        checkClosed();
+        if (availableBufferToWrite() < len) {
+            flush();
+        }
+        if (buffer.length >= len) {
+            System.arraycopy(b, off, buffer, count, len);
+            count += len;
+        }
+        else {
+        	write( ByteBuffer.wrap(b, off, len));
+        }
+    }
+
+	/**
+     * flush the data to the output stream
+     * This doesn't call flush on the underlying outputstream, because
+     * Tcp is particularly efficent at doing this itself ....
+     *
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+        	byteBuffer.position(0);
+        	byteBuffer.limit(count);
+            write(byteBuffer);
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     *
+     * @throws IOException
+     */
+    public void close() throws IOException {
+        super.close();
+        closed = true;
+    }
+
+
+    /**
+     * Checks that the stream has not been closed
+     *
+     * @throws IOException
+     */
+    protected void checkClosed() throws IOException {
+        if (closed) {
+            throw new EOFException("Cannot write to the stream any more it has already been
closed");
+        }
+    }
+
+    /**
+     * @return the amount free space in the buffer
+     */
+    private int availableBufferToWrite() {
+        return buffer.length - count;
+    }
+    
+    protected void write(ByteBuffer data) throws IOException {
+        int remaining = data.remaining();        
+        int lastRemaining = remaining-1;
+        long delay=1;
+        while( remaining > 0 ) {
+        	
+	        // We may need to do a little bit of sleeping to avoid a busy loop.
+            // Slow down if no data was written out.. 
+	        if( remaining == lastRemaining ) {
+	            try {
+                    // Use exponential rollback to increase sleep time.
+                    Thread.sleep(delay);
+                    delay *= 2;
+                    if( delay > 1000 ) {
+                        delay = 1000;
+                    }
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }                        
+	        } else {
+	            delay = 1;
+	        }        	        
+	        lastRemaining = remaining;
+	        
+            // Since the write is non-blocking, all the data may not have been written.
+            out.write( data );        
+            remaining = data.remaining();        	        
+        }    
+	}
+    
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+
+/**
+ * An implementation of the {@link Transport} interface using raw tcp/ip
+ * 
+ * @version $Revision$
+ */
+public class NIOTransport extends TcpTransport {
+
+	//private static final Log log = LogFactory.getLog(NIOTransport.class);
+	private SocketChannel channel;
+	private SelectorSelection selection;
+	private ByteBuffer inputBuffer;
+	private ByteBuffer currentBuffer;
+	private int nextFrameSize;
+
+	public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
+		super(wireFormat, socketFactory, remoteLocation, localLocation);
+	}
+
+	public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+		super(wireFormat, socket);
+	}
+
+	protected void initializeStreams() throws IOException {
+		channel = socket.getChannel();		
+		channel.configureBlocking(false);
+		
+		// listen for events telling us when the socket is readable.
+		selection = SelectorManager.getInstance().register(channel,
+				new SelectorManager.Listener() {
+					public void onSelect(SelectorSelection selection) {
+						serviceRead();
+					}
+					public void onError(SelectorSelection selection, Throwable error) {
+						if( error instanceof IOException ) {
+							onException((IOException) error);							
+						} else {
+							onException(IOExceptionSupport.create(error));							
+						}
+					}
+				});
+		
+		// Send the data via the channel
+//        inputBuffer = ByteBuffer.allocateDirect(8*1024);
+        inputBuffer = ByteBuffer.allocate(8*1024);
+        currentBuffer = inputBuffer;
+        nextFrameSize=-1;
+        currentBuffer.limit(4);
+        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
+        
+	}
+	
+    private void serviceRead() {
+        try {
+            while( true ) {
+            	
+	
+	            int readSize = channel.read(currentBuffer);
+	            if( readSize == -1 ) {
+					onException(new EOFException());
+	                selection.close();
+	                break;
+	            }
+	            if( readSize==0 ) {
+	                break;
+	            }
+	            
+            	if( currentBuffer.hasRemaining() )
+            		continue;
+
+	            // Are we trying to figure out the size of the next frame?
+	            if( nextFrameSize==-1 ) {
+	            	assert inputBuffer == currentBuffer;
+
+	            	// If the frame is too big to fit in our direct byte buffer,
+	            	// Then allocate a non direct byte buffer of the right size for it.
+	            	inputBuffer.flip();
+	            	nextFrameSize = inputBuffer.getInt()+4;
+	            	if( nextFrameSize > inputBuffer.capacity() ) {
+	            		currentBuffer = ByteBuffer.allocate(nextFrameSize);
+	            		currentBuffer.putInt(nextFrameSize);
+	            	} else {
+	            		inputBuffer.limit(nextFrameSize);	            		
+	            	}
+	            	
+            	} else {
+            		currentBuffer.flip();
+    				
+            		Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+            		doConsume((Command) command);
+            		
+            		nextFrameSize=-1;
+    				inputBuffer.clear();
+    				inputBuffer.limit(4);
+    				currentBuffer = inputBuffer;
+            	}
+	            
+            }
+            
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+        	onException(IOExceptionSupport.create(e));
+        }
+    }
+
+
+	protected void doStart() throws Exception {
+        connect();
+        selection.setInterestOps(SelectionKey.OP_READ);
+        selection.enable();
+    }
+
+	protected void doStop(ServiceStopper stopper) throws Exception {
+		selection.disable();
+		super.doStop(stopper);		
+	}
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+
+public class NIOTransportFactory extends TcpTransportFactory {
+		
+	protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory
serverSocketFactory) throws IOException, URISyntaxException {
+		return new TcpTransportServer(this, location, serverSocketFactory) {
+			protected Transport createTransport(Socket socket, WireFormat format) throws IOException
{
+				return new NIOTransport(format,socket);
+			}			
+		};
+	}
+	
+	protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI
location, URI localLocation) throws UnknownHostException, IOException {
+		return new NIOTransport(wf, socketFactory, location, localLocation);
+	}
+
+	
+    protected ServerSocketFactory createServerSocketFactory() {
+        return new ServerSocketFactory() {
+			public ServerSocket createServerSocket(int port) throws IOException {
+		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		        serverSocketChannel.socket().bind(new InetSocketAddress(port));
+				return serverSocketChannel.socket();
+			}
+			public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		        serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
+				return serverSocketChannel.socket();
+			}
+			public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws
IOException {
+		        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		        serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
+				return serverSocketChannel.socket();
+			}
+        };
+    }
+    
+    protected SocketFactory createSocketFactory() {
+        return new SocketFactory() {
+
+        	public Socket createSocket() throws IOException {
+		        SocketChannel channel = SocketChannel.open();
+        		return channel.socket();
+        	}
+        	
+			public Socket createSocket(String host, int port) throws IOException, UnknownHostException
{
+		        SocketChannel channel = SocketChannel.open();
+		        channel.connect(new InetSocketAddress(host, port));
+				return channel.socket();
+			}
+
+			public Socket createSocket(InetAddress address, int port) throws IOException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.connect(new InetSocketAddress(address, port));
+				return channel.socket();
+			}
+
+			public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort)
throws IOException, UnknownHostException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+		        channel.connect(new InetSocketAddress(address, port));
+				return channel.socket();
+			}
+
+			public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int
localPort) throws IOException {
+		        SocketChannel channel = SocketChannel.open();
+		        channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
+		        channel.connect(new InetSocketAddress(address, port));
+				return channel.socket();
+			}
+        };
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The SelectorManager will manage one Selector and the thread that checks the
+ * selector.
+ * 
+ * We may need to consider running more than one thread to check the selector if
+ * servicing the selector takes too long.
+ * 
+ * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
+ */
+final public class SelectorManager {
+
+	static final public SelectorManager singleton = new SelectorManager();
+	static SelectorManager getInstance() { 
+		return singleton;
+	}
+	
+	public interface Listener {
+		public void onSelect(SelectorSelection selector);
+		public void onError(SelectorSelection selection, Throwable error);
+	}
+	
+	private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
+		public Thread newThread(Runnable r) {
+			Thread rc = new Thread(r);
+			rc.setName("NIO Transport Thread");
+			return rc;
+		}});
+	private Executor channelExecutor = selectorExecutor;
+	private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
+	private int maxChannelsPerWorker = 64;
+	
+	public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
+	 	throws IOException {
+
+		SelectorWorker worker = null;
+		if (freeWorkers.size() > 0) {
+			worker = freeWorkers.getFirst();
+		} else {
+			worker = new SelectorWorker(this);
+			freeWorkers.addFirst(worker);
+		}
+
+		SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);	
		
+		return selection;
+	}
+
+	synchronized void onWorkerFullEvent(SelectorWorker worker) {
+		freeWorkers.remove(worker);
+	}
+
+	synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
+		freeWorkers.remove(worker);
+	}
+
+	synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
+		freeWorkers.add(worker);
+	}
+
+	public Executor getChannelExecutor() {
+		return channelExecutor;
+	}
+
+	public void setChannelExecutor(Executor channelExecutor) {
+		this.channelExecutor = channelExecutor;
+	}
+
+	public int getMaxChannelsPerWorker() {
+		return maxChannelsPerWorker;
+	}
+
+	public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
+		this.maxChannelsPerWorker = maxChannelsPerWorker;
+	}
+
+	public Executor getSelectorExecutor() {
+		return selectorExecutor;
+	}
+
+	public void setSelectorExecutor(Executor selectorExecutor) {
+		this.selectorExecutor = selectorExecutor;
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activemq.transport.nio.SelectorManager.Listener;
+
+/**
+ * 
+ * @author chirino
+ */
+final public class SelectorSelection {
+
+	private final SelectorWorker worker;
+	private final SelectionKey key;
+	private final Listener listener;
+	private int interest;
+
+
+	public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener)
throws ClosedChannelException {
+		this.worker = worker;
+		this.listener = listener;
+		this.key = socketChannel.register(worker.selector, 0, this);
+		worker.incrementUseCounter();
+	}
+
+	public void setInterestOps(int ops) {
+		interest = ops;
+	}
+
+	public void enable() {
+		key.interestOps(interest);				
+		worker.selector.wakeup();
+	}
+
+	public void disable() {
+		key.interestOps(0);
+	}
+
+	public void close() {
+		worker.decrementUseCounter();
+		key.cancel();
+		worker.selector.wakeup();
+	}
+
+	public void onSelect() {
+		listener.onSelect(this);
+	}
+
+	public void onError(Throwable e) {
+		listener.onError(this, e);
+	}
+
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class SelectorWorker implements Runnable {
+
+	private final static AtomicInteger NEXT_ID = new AtomicInteger();
+
+	final SelectorManager manager;
+	final Selector selector;
+	final int id = NEXT_ID.getAndIncrement(); 
+	final AtomicInteger useCounter = new AtomicInteger();
+	final private int maxChannelsPerWorker;
+
+
+	public SelectorWorker(SelectorManager manager) throws IOException {
+		this.manager = manager;
+		selector = Selector.open();
+		maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+	}
+	
+	void incrementUseCounter() {
+		int use = useCounter.getAndIncrement();
+		if( use == 0 ) {
+			manager.getSelectorExecutor().execute(this);
+		} else if( use+1 == maxChannelsPerWorker ) {
+			manager.onWorkerFullEvent(this);
+		}
+	}
+
+	void decrementUseCounter() {
+		int use = useCounter.getAndDecrement();
+		if (use == 1) {
+			manager.onWorkerEmptyEvent(this);
+		} else if (use == maxChannelsPerWorker ) {
+			manager.onWorkerNotFullEvent(this);
+		}
+	}
+
+	boolean isRunning() {
+		return useCounter.get()!=0;
+	}
+
+	public void run() {
+
+		String origName = Thread.currentThread().getName();
+		try {
+			Thread.currentThread().setName("Selector Worker: " + id);
+			while (isRunning()) {
+
+				int count = selector.select(10);
+				if (count == 0)
+					continue;
+				
+				if (!isRunning())
+					return;
+
+				// Get a java.util.Set containing the SelectionKey objects
+				// for all channels that are ready for I/O.
+				Set keys = selector.selectedKeys();
+
+				for (Iterator i = keys.iterator(); i.hasNext();) {
+					final SelectionKey key = (SelectionKey) i.next();
+					i.remove();
+
+					final SelectorSelection s = (SelectorSelection) key.attachment();
+					try {
+						s.disable();
+						
+						// Kick off another thread to find newly selected keys while we process the 
+						// currently selected keys                
+						manager.getChannelExecutor().execute(new Runnable() {
+							public void run() {
+								try {
+									s.onSelect();
+									s.enable();
+								} catch (Throwable e) {
+									s.onError(e);
+								}
+							}
+						});
+						
+					} catch ( Throwable e ) {
+						s.onError(e);
+					}
+					
+				}
+
+			}
+		} catch (IOException e) {
+			
+			// Don't accept any more slections
+			manager.onWorkerEmptyEvent(this);
+
+			// Notify all the selections that the error occurred.
+			Set keys = selector.keys();
+			for (Iterator i = keys.iterator(); i.hasNext();) {
+				SelectionKey key = (SelectionKey) i.next();
+				SelectorSelection s = (SelectorSelection) key.attachment();
+				s.onError(e);
+			}
+			
+		} finally {
+			Thread.currentThread().setName(origName);
+		}
+	}
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio
Fri Jan  5 15:23:14 2007
@@ -0,0 +1 @@
+class=org.apache.activemq.transport.nio.NIOTransportFactory

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import org.apache.activemq.JmsDurableTopicSendReceiveTest;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class NIOJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendReceiveTest {
+    protected BrokerService broker;
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+            broker.start();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
+        return connectionFactory;
+    }
+
+    protected String getBrokerURL() {
+        return "nio://localhost:61616";
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.addConnector(getBrokerURL());
+        return answer;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
+
+/**
+ * @version $Revision$
+ */
+public class NIOJmsSendAndReceiveTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
+    protected BrokerService broker;
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+            broker.start();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
+        return connectionFactory;
+    }
+
+    protected String getBrokerURL() {
+        return "nio://localhost:61616";
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.addConnector(getBrokerURL());
+        return answer;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import org.apache.activemq.broker.BrokerService;
+
+import javax.jms.DeliveryMode;
+
+public class NIOPersistentSendAndReceiveTest extends NIOJmsSendAndReceiveTest {
+    protected BrokerService broker;
+
+    protected void setUp() throws Exception {
+        this.topic = false;
+        this.deliveryMode = DeliveryMode.PERSISTENT;
+        super.setUp();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(true);
+        answer.addConnector(getBrokerURL());
+        return answer;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java?view=auto&rev=493241
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
Fri Jan  5 15:23:14 2007
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import org.apache.activemq.transport.TransportBrokerTestSupport;
+
+import junit.framework.Test;
+import junit.textui.TestRunner;
+
+public class NIOTransportBrokerTest extends TransportBrokerTestSupport {
+
+    protected String getBindLocation() {
+        return "nio://localhost:61616";
+    }
+
+    public static Test suite() {
+        return suite(NIOTransportBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        TestRunner.run(suite());
+    }
+
+}



Mime
View raw message