activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [2/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ activ...
Date Tue, 21 Feb 2006 23:14:17 GMT

Propchange: incubator/activemq/trunk/activecluster/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/Cluster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/ClusterEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/ClusterException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/ClusterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/ClusterListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/DestinationMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/LocalNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/Node.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/Service.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/election/ElectionStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/election/impl/BullyElectionStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/BuddyGroupModel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/Group.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/GroupClusterListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/GroupModel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/MasterZoneFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/Membership.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/NodeFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/group/NodeMemberships.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/ActiveMQClusterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/DefaultCluster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/DefaultClusterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/DefaultDestinationMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/NodeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/NodeState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/NonReplicatedLocalNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/ReplicatedLocalNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/StateConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/StateService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/StateServiceImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/java/org/apache/activecluster/impl/StateServiceStub.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/ChatDemo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/ClusterDemo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/ClusterFunctionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/ClusterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/ClusterTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/StubMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/TestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/TestingClusterListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/group/BuddyGroupModelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/group/GroupModelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activecluster/src/test/org/apache/activecluster/group/GroupTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-aio/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,262 +1,262 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.aio;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.EOSPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelListener;
-import org.apache.activeio.stream.sync.socket.SocketMetadata;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import com.ibm.io.async.AsyncSocketChannel;
-import com.ibm.io.async.IAbstractAsyncFuture;
-import com.ibm.io.async.IAsyncFuture;
-import com.ibm.io.async.ICompletionListener;
-
-/**
- * @version $Revision$
- */
-final public class AIOAsyncChannel implements AsyncChannel, ICompletionListener, SocketMetadata {
-
-    protected static final int DEFAULT_BUFFER_SIZE = ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE;
-
-    private final AsyncSocketChannel socketChannel;
-    private final Socket socket;
-
-    private AsyncChannelListener channelListener;
-    private ByteBuffer inputByteBuffer;
-    
-    private final AtomicBoolean running = new AtomicBoolean(false);
-    private CountDownLatch doneCountDownLatch;
-
-    protected AIOAsyncChannel(AsyncSocketChannel socketChannel) throws IOException {
-        this.socketChannel = socketChannel;
-        this.socket = socketChannel.socket();
-        this.socket.setSendBufferSize(DEFAULT_BUFFER_SIZE);
-        this.socket.setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
-        this.socket.setSoTimeout(0);
-    }
-    
-    private ByteBuffer allocateBuffer() {
-        return ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
-    }   
-
-    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
-        this.channelListener = channelListener;
-    }
-
-    public AsyncChannelListener getAsyncChannelListener() {
-        return channelListener;
-    }
-
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-
-    public void dispose() {
-        if( running.get() && channelListener!=null ) {
-            channelListener.onPacketError(new SocketException("Socket closed."));
-        }
-        try {
-            stop();
-        } catch (IOException e) {
-        }
-        try {
-            socketChannel.close();
-        } catch (IOException e) {
-        }
-    }
-
-    public void start() throws IOException {
-        if( running.compareAndSet(false, true) ) {
-            doneCountDownLatch = new CountDownLatch(1);
-            requestNextRead();
-        }
-    }
-
-    public void stop() throws IOException {
-        if( running.compareAndSet(true, false) ) {
-            try {
-                doneCountDownLatch.await(5,  TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                throw new InterruptedIOException();
-            }
-        }
-    }
-
-    public void write(Packet packet) throws IOException {
-        ByteBuffer data = ((ByteBufferPacket)packet).getByteBuffer();
-        while( data.hasRemaining() ) {
-	        IAsyncFuture future = socketChannel.write(data);
-	        try {
-	            future.getByteCount();
-	        } catch (InterruptedException e) {
-	            throw new InterruptedIOException();
-	        }
-        }
-    }
-
-    public void flush() throws IOException {
-    }
-
-    public void futureCompleted(IAbstractAsyncFuture abstractFuture, Object attribute) {
-        IAsyncFuture future = (IAsyncFuture)abstractFuture;
-        try {
-            
-            if( inputByteBuffer.position()>0 ) {
-	            ByteBuffer remaining = inputByteBuffer.slice();            
-	            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
-	            
-	            channelListener.onPacket(data);	            
-	            // Keep the remaining buffer around to fill with data.
-	            inputByteBuffer = remaining;
-	            requestNextRead();
-	            
-            } else {                
-                channelListener.onPacket(EOSPacket.EOS_PACKET);  
-            }
-            
-        } catch (IOException e) {
-            channelListener.onPacketError(e);
-        }
-    }
-
-    private void requestNextRead() throws InterruptedIOException {
-        
-        // Don't do next read if we have stopped running.
-        if( !running.get() ) {
-            doneCountDownLatch.countDown();
-            return;
-        }
-        
-        try {
-            
-            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
-                inputByteBuffer = allocateBuffer();
-            }
-
-            IAsyncFuture future = socketChannel.read(inputByteBuffer);
-            future.addCompletionListener(this, null, false);
-            
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException();
-        }
-
-    }
-
-    public InetAddress getInetAddress() {
-        return socket.getInetAddress();
-    }
-    public boolean getKeepAlive() throws SocketException {
-        return socket.getKeepAlive();
-    }
-    public InetAddress getLocalAddress() {
-        return socket.getLocalAddress();
-    }
-    public int getLocalPort() {
-        return socket.getLocalPort();
-    }
-    public SocketAddress getLocalSocketAddress() {
-        return socket.getLocalSocketAddress();
-    }
-    public boolean getOOBInline() throws SocketException {
-        return socket.getOOBInline();
-    }
-    public int getPort() {
-        return socket.getPort();
-    }
-    public int getReceiveBufferSize() throws SocketException {
-        return socket.getReceiveBufferSize();
-    }
-    public SocketAddress getRemoteSocketAddress() {
-        return socket.getRemoteSocketAddress();
-    }
-    public boolean getReuseAddress() throws SocketException {
-        return socket.getReuseAddress();
-    }
-    public int getSendBufferSize() throws SocketException {
-        return socket.getSendBufferSize();
-    }
-    public int getSoLinger() throws SocketException {
-        return socket.getSoLinger();
-    }
-    public int getSoTimeout() throws SocketException {
-        return socket.getSoTimeout();
-    }
-    public boolean getTcpNoDelay() throws SocketException {
-        return socket.getTcpNoDelay();
-    }
-    public int getTrafficClass() throws SocketException {
-        return socket.getTrafficClass();
-    }
-    public boolean isBound() {
-        return socket.isBound();
-    }
-    public boolean isClosed() {
-        return socket.isClosed();
-    }
-    public boolean isConnected() {
-        return socket.isConnected();
-    }
-    public void setKeepAlive(boolean on) throws SocketException {
-        socket.setKeepAlive(on);
-    }
-    public void setOOBInline(boolean on) throws SocketException {
-        socket.setOOBInline(on);
-    }
-    public void setReceiveBufferSize(int size) throws SocketException {
-        socket.setReceiveBufferSize(size);
-    }
-    public void setReuseAddress(boolean on) throws SocketException {
-        socket.setReuseAddress(on);
-    }
-    public void setSendBufferSize(int size) throws SocketException {
-        socket.setSendBufferSize(size);
-    }
-    public void setSoLinger(boolean on, int linger) throws SocketException {
-        socket.setSoLinger(on, linger);
-    }
-    public void setTcpNoDelay(boolean on) throws SocketException {
-        socket.setTcpNoDelay(on);
-    }
-    public void setTrafficClass(int tc) throws SocketException {
-        socket.setTrafficClass(tc);
-    }
-    public void setSoTimeout(int i) throws SocketException {
-        socket.setSoTimeout(i);
-    }
-    public String toString() {
-        return "AIO Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress();
-    }
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.stream.sync.socket.SocketMetadata;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import com.ibm.io.async.AsyncSocketChannel;
+import com.ibm.io.async.IAbstractAsyncFuture;
+import com.ibm.io.async.IAsyncFuture;
+import com.ibm.io.async.ICompletionListener;
+
+/**
+ * @version $Revision$
+ */
+final public class AIOAsyncChannel implements AsyncChannel, ICompletionListener, SocketMetadata {
+
+    protected static final int DEFAULT_BUFFER_SIZE = ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE;
+
+    private final AsyncSocketChannel socketChannel;
+    private final Socket socket;
+
+    private AsyncChannelListener channelListener;
+    private ByteBuffer inputByteBuffer;
+    
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private CountDownLatch doneCountDownLatch;
+
+    protected AIOAsyncChannel(AsyncSocketChannel socketChannel) throws IOException {
+        this.socketChannel = socketChannel;
+        this.socket = socketChannel.socket();
+        this.socket.setSendBufferSize(DEFAULT_BUFFER_SIZE);
+        this.socket.setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
+        this.socket.setSoTimeout(0);
+    }
+    
+    private ByteBuffer allocateBuffer() {
+        return ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
+    }   
+
+    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+        this.channelListener = channelListener;
+    }
+
+    public AsyncChannelListener getAsyncChannelListener() {
+        return channelListener;
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+
+    public void dispose() {
+        if( running.get() && channelListener!=null ) {
+            channelListener.onPacketError(new SocketException("Socket closed."));
+        }
+        try {
+            stop();
+        } catch (IOException e) {
+        }
+        try {
+            socketChannel.close();
+        } catch (IOException e) {
+        }
+    }
+
+    public void start() throws IOException {
+        if( running.compareAndSet(false, true) ) {
+            doneCountDownLatch = new CountDownLatch(1);
+            requestNextRead();
+        }
+    }
+
+    public void stop() throws IOException {
+        if( running.compareAndSet(true, false) ) {
+            try {
+                doneCountDownLatch.await(5,  TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }
+    }
+
+    public void write(Packet packet) throws IOException {
+        ByteBuffer data = ((ByteBufferPacket)packet).getByteBuffer();
+        while( data.hasRemaining() ) {
+	        IAsyncFuture future = socketChannel.write(data);
+	        try {
+	            future.getByteCount();
+	        } catch (InterruptedException e) {
+	            throw new InterruptedIOException();
+	        }
+        }
+    }
+
+    public void flush() throws IOException {
+    }
+
+    public void futureCompleted(IAbstractAsyncFuture abstractFuture, Object attribute) {
+        IAsyncFuture future = (IAsyncFuture)abstractFuture;
+        try {
+            
+            if( inputByteBuffer.position()>0 ) {
+	            ByteBuffer remaining = inputByteBuffer.slice();            
+	            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
+	            
+	            channelListener.onPacket(data);	            
+	            // Keep the remaining buffer around to fill with data.
+	            inputByteBuffer = remaining;
+	            requestNextRead();
+	            
+            } else {                
+                channelListener.onPacket(EOSPacket.EOS_PACKET);  
+            }
+            
+        } catch (IOException e) {
+            channelListener.onPacketError(e);
+        }
+    }
+
+    private void requestNextRead() throws InterruptedIOException {
+        
+        // Don't do next read if we have stopped running.
+        if( !running.get() ) {
+            doneCountDownLatch.countDown();
+            return;
+        }
+        
+        try {
+            
+            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
+                inputByteBuffer = allocateBuffer();
+            }
+
+            IAsyncFuture future = socketChannel.read(inputByteBuffer);
+            future.addCompletionListener(this, null, false);
+            
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+
+    }
+
+    public InetAddress getInetAddress() {
+        return socket.getInetAddress();
+    }
+    public boolean getKeepAlive() throws SocketException {
+        return socket.getKeepAlive();
+    }
+    public InetAddress getLocalAddress() {
+        return socket.getLocalAddress();
+    }
+    public int getLocalPort() {
+        return socket.getLocalPort();
+    }
+    public SocketAddress getLocalSocketAddress() {
+        return socket.getLocalSocketAddress();
+    }
+    public boolean getOOBInline() throws SocketException {
+        return socket.getOOBInline();
+    }
+    public int getPort() {
+        return socket.getPort();
+    }
+    public int getReceiveBufferSize() throws SocketException {
+        return socket.getReceiveBufferSize();
+    }
+    public SocketAddress getRemoteSocketAddress() {
+        return socket.getRemoteSocketAddress();
+    }
+    public boolean getReuseAddress() throws SocketException {
+        return socket.getReuseAddress();
+    }
+    public int getSendBufferSize() throws SocketException {
+        return socket.getSendBufferSize();
+    }
+    public int getSoLinger() throws SocketException {
+        return socket.getSoLinger();
+    }
+    public int getSoTimeout() throws SocketException {
+        return socket.getSoTimeout();
+    }
+    public boolean getTcpNoDelay() throws SocketException {
+        return socket.getTcpNoDelay();
+    }
+    public int getTrafficClass() throws SocketException {
+        return socket.getTrafficClass();
+    }
+    public boolean isBound() {
+        return socket.isBound();
+    }
+    public boolean isClosed() {
+        return socket.isClosed();
+    }
+    public boolean isConnected() {
+        return socket.isConnected();
+    }
+    public void setKeepAlive(boolean on) throws SocketException {
+        socket.setKeepAlive(on);
+    }
+    public void setOOBInline(boolean on) throws SocketException {
+        socket.setOOBInline(on);
+    }
+    public void setReceiveBufferSize(int size) throws SocketException {
+        socket.setReceiveBufferSize(size);
+    }
+    public void setReuseAddress(boolean on) throws SocketException {
+        socket.setReuseAddress(on);
+    }
+    public void setSendBufferSize(int size) throws SocketException {
+        socket.setSendBufferSize(size);
+    }
+    public void setSoLinger(boolean on, int linger) throws SocketException {
+        socket.setSoLinger(on, linger);
+    }
+    public void setTcpNoDelay(boolean on) throws SocketException {
+        socket.setTcpNoDelay(on);
+    }
+    public void setTrafficClass(int tc) throws SocketException {
+        socket.setTrafficClass(tc);
+    }
+    public void setSoTimeout(int i) throws SocketException {
+        socket.setSoTimeout(i);
+    }
+    public String toString() {
+        return "AIO Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress();
+    }
+
  }

Propchange: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,115 +1,115 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.aio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.activeio.adapter.SyncToAsyncChannelServer;
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
-import org.apache.activeio.util.URISupport;
-
-import com.ibm.io.async.AsyncServerSocketChannel;
-import com.ibm.io.async.AsyncSocketChannel;
-
-/**
- * A TcpAsyncChannelFactory creates {@see org.apache.activeio.net.TcpAsyncChannel}
- * and {@see org.apache.activeio.net.TcpAsyncChannelServer} objects.
- * 
- * @version $Revision$
- */
-public class AIOAsyncChannelFactory implements AsyncChannelFactory {
-
-    protected static final int DEFAULT_BACKLOG = 500;
-    private int backlog = DEFAULT_BACKLOG;
-        
-    /**
-     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
-     * 
-     * @see org.apache.activeio.AsyncChannelFactory#openAsyncChannel(java.net.URI)
-     */
-    public AsyncChannel openAsyncChannel(URI location) throws IOException {
-        AsyncSocketChannel channel = AsyncSocketChannel.open();
-        channel.connect(new InetSocketAddress(location.getHost(), location.getPort()));
-        return createAsyncChannel(channel);
-    }
-
-    /**
-     * @param channel
-     * @return
-     * @throws IOException
-     */
-    protected AsyncChannel createAsyncChannel(AsyncSocketChannel socketChannel) throws IOException {
-        AsyncChannel channel = new AIOAsyncChannel(socketChannel);
-        channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(true), false);
-        return channel;
-    }
-
-    /**
-     * Binds a server socket a the {@param location}'s port. 
-     * 
-     * @see org.apache.activeio.AsyncChannelFactory#bindAsyncChannel(java.net.URI)
-     */
-    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
-        
-        String host = bindURI.getHost();
-        InetSocketAddress address;
-        if( host == null || host.length() == 0 || host.equals("localhost") || host.equals("0.0.0.0") || InetAddress.getLocalHost().getHostName().equals(host) ) {            
-            address = new InetSocketAddress(bindURI.getPort());
-        } else {
-            address = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
-        }
-        
-        AsyncServerSocketChannel serverSocketChannel = AsyncServerSocketChannel.open();
-        serverSocketChannel.socket().bind(address,backlog);
-        
-        URI connectURI = bindURI;
-        try {
-//            connectURI = URISupport.changeHost(connectURI, InetAddress.getLocalHost().getHostName());
-            connectURI = URISupport.changePort(connectURI, serverSocketChannel.socket().getLocalPort());
-        } catch (URISyntaxException e) {
-            throw (IOException)new IOException("Could not build connect URI: "+e).initCause(e);
-        }
-        
-        return SyncToAsyncChannelServer.adapt( 
-                new AIOSyncChannelServer(serverSocketChannel, bindURI, connectURI));
-    }
-    
-    /**
-     * @return Returns the backlog.
-     */
-    public int getBacklog() {
-        return backlog;
-    }
-
-    /**
-     * @param backlog
-     *            The backlog to set.
-     */
-    public void setBacklog(int backlog) {
-        this.backlog = backlog;
-    }
-
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activeio.adapter.SyncToAsyncChannelServer;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.util.URISupport;
+
+import com.ibm.io.async.AsyncServerSocketChannel;
+import com.ibm.io.async.AsyncSocketChannel;
+
+/**
+ * A TcpAsyncChannelFactory creates {@see org.apache.activeio.net.TcpAsyncChannel}
+ * and {@see org.apache.activeio.net.TcpAsyncChannelServer} objects.
+ * 
+ * @version $Revision$
+ */
+public class AIOAsyncChannelFactory implements AsyncChannelFactory {
+
+    protected static final int DEFAULT_BACKLOG = 500;
+    private int backlog = DEFAULT_BACKLOG;
+        
+    /**
+     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
+     * 
+     * @see org.apache.activeio.AsyncChannelFactory#openAsyncChannel(java.net.URI)
+     */
+    public AsyncChannel openAsyncChannel(URI location) throws IOException {
+        AsyncSocketChannel channel = AsyncSocketChannel.open();
+        channel.connect(new InetSocketAddress(location.getHost(), location.getPort()));
+        return createAsyncChannel(channel);
+    }
+
+    /**
+     * @param channel
+     * @return
+     * @throws IOException
+     */
+    protected AsyncChannel createAsyncChannel(AsyncSocketChannel socketChannel) throws IOException {
+        AsyncChannel channel = new AIOAsyncChannel(socketChannel);
+        channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(true), false);
+        return channel;
+    }
+
+    /**
+     * Binds a server socket a the {@param location}'s port. 
+     * 
+     * @see org.apache.activeio.AsyncChannelFactory#bindAsyncChannel(java.net.URI)
+     */
+    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
+        
+        String host = bindURI.getHost();
+        InetSocketAddress address;
+        if( host == null || host.length() == 0 || host.equals("localhost") || host.equals("0.0.0.0") || InetAddress.getLocalHost().getHostName().equals(host) ) {            
+            address = new InetSocketAddress(bindURI.getPort());
+        } else {
+            address = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
+        }
+        
+        AsyncServerSocketChannel serverSocketChannel = AsyncServerSocketChannel.open();
+        serverSocketChannel.socket().bind(address,backlog);
+        
+        URI connectURI = bindURI;
+        try {
+//            connectURI = URISupport.changeHost(connectURI, InetAddress.getLocalHost().getHostName());
+            connectURI = URISupport.changePort(connectURI, serverSocketChannel.socket().getLocalPort());
+        } catch (URISyntaxException e) {
+            throw (IOException)new IOException("Could not build connect URI: "+e).initCause(e);
+        }
+        
+        return SyncToAsyncChannelServer.adapt( 
+                new AIOSyncChannelServer(serverSocketChannel, bindURI, connectURI));
+    }
+    
+    /**
+     * @return Returns the backlog.
+     */
+    public int getBacklog() {
+        return backlog;
+    }
+
+    /**
+     * @param backlog
+     *            The backlog to set.
+     */
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
+    }
+
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,107 +1,107 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.aio;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-
-import org.apache.activeio.Channel;
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-import com.ibm.io.async.AsyncServerSocketChannel;
-
-/**
- * @version $Revision$
- */
-public class AIOSyncChannelServer implements SyncChannelServer {
-
-    private final AsyncServerSocketChannel serverSocket;
-    private final URI bindURI;
-    private final URI connectURI;
-    private int curentSoTimeout;
-
-    public AIOSyncChannelServer(AsyncServerSocketChannel serverSocket, URI bindURI, URI connectURI) throws IOException {
-        this.serverSocket=serverSocket;
-        this.bindURI=bindURI;
-        this.connectURI=connectURI;
-        this.curentSoTimeout = serverSocket.socket().getSoTimeout();
-    }
-    
-    public URI getBindURI() {
-        return bindURI;
-    }
-
-    public URI getConnectURI() {
-        return this.connectURI;
-    }
-
-    public void dispose() {
-        try {
-            serverSocket.close();
-        } catch (IOException e) {
-        }
-    }
-
-    synchronized public void start() throws IOException {
-    }
-
-    synchronized public void stop() {
-    }
-
-    public Channel accept(long timeout) throws IOException {
-        try {
-	        
-            if (timeout == SyncChannelServer.WAIT_FOREVER_TIMEOUT)
-	            setSoTimeout(0);
-	        else if (timeout == SyncChannelServer.NO_WAIT_TIMEOUT)
-	            setSoTimeout(1);
-	        else
-	            setSoTimeout((int) timeout);
-	
-            AsyncChannel channel = new AIOAsyncChannel(serverSocket.accept());
-            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(true), false);
-            return channel;
-	        
-        } catch (SocketTimeoutException ignore) {
-        }
-        return null;	        
-    }
-
-    private void setSoTimeout(int i) throws SocketException {
-        if (curentSoTimeout != i) {
-            serverSocket.socket().setSoTimeout(i);
-            curentSoTimeout = i;
-        }
-    }
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-    
-    public String toString() {
-        return "AIO Server: "+getConnectURI();
-    }
-    
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+import com.ibm.io.async.AsyncServerSocketChannel;
+
+/**
+ * @version $Revision$
+ */
+public class AIOSyncChannelServer implements SyncChannelServer {
+
+    private final AsyncServerSocketChannel serverSocket;
+    private final URI bindURI;
+    private final URI connectURI;
+    private int curentSoTimeout;
+
+    public AIOSyncChannelServer(AsyncServerSocketChannel serverSocket, URI bindURI, URI connectURI) throws IOException {
+        this.serverSocket=serverSocket;
+        this.bindURI=bindURI;
+        this.connectURI=connectURI;
+        this.curentSoTimeout = serverSocket.socket().getSoTimeout();
+    }
+    
+    public URI getBindURI() {
+        return bindURI;
+    }
+
+    public URI getConnectURI() {
+        return this.connectURI;
+    }
+
+    public void dispose() {
+        try {
+            serverSocket.close();
+        } catch (IOException e) {
+        }
+    }
+
+    synchronized public void start() throws IOException {
+    }
+
+    synchronized public void stop() {
+    }
+
+    public Channel accept(long timeout) throws IOException {
+        try {
+	        
+            if (timeout == SyncChannelServer.WAIT_FOREVER_TIMEOUT)
+	            setSoTimeout(0);
+	        else if (timeout == SyncChannelServer.NO_WAIT_TIMEOUT)
+	            setSoTimeout(1);
+	        else
+	            setSoTimeout((int) timeout);
+	
+            AsyncChannel channel = new AIOAsyncChannel(serverSocket.accept());
+            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(true), false);
+            return channel;
+	        
+        } catch (SocketTimeoutException ignore) {
+        }
+        return null;	        
+    }
+
+    private void setSoTimeout(int i) throws SocketException {
+        if (curentSoTimeout != i) {
+            serverSocket.socket().setSoTimeout(i);
+            curentSoTimeout = i;
+        }
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+    
+    public String toString() {
+        return "AIO Server: "+getConnectURI();
+    }
+    
 }

Propchange: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java (original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java Tue Feb 21 15:12:56 2006
@@ -1,47 +1,47 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.aio;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.activeio.Channel;
-import org.apache.activeio.ChannelServer;
-import org.apache.activeio.packet.async.aio.AIOAsyncChannelFactory;
-import org.apache.activeio.packet.sync.SyncChannelTestSupport;
-
-/**
- * @version $Revision$
- */
-public class AIOAsyncChannelTest extends SyncChannelTestSupport {
-
-    static boolean disabled = System.getProperty("disable.aio.tests", "false").equals("true");    
-    AIOAsyncChannelFactory factory =  new AIOAsyncChannelFactory();
-
-    protected Channel openChannel(URI connectURI) throws IOException {
-        return factory.openAsyncChannel(connectURI);
-    }
-
-    protected ChannelServer bindChannel() throws IOException, URISyntaxException {
-        return factory.bindAsyncChannel(new URI("tcp://localhost:0"));
-    }
-
-    protected boolean isDisabled() {
-        return disabled;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.ChannelServer;
+import org.apache.activeio.packet.async.aio.AIOAsyncChannelFactory;
+import org.apache.activeio.packet.sync.SyncChannelTestSupport;
+
+/**
+ * @version $Revision$
+ */
+public class AIOAsyncChannelTest extends SyncChannelTestSupport {
+
+    static boolean disabled = System.getProperty("disable.aio.tests", "false").equals("true");    
+    AIOAsyncChannelFactory factory =  new AIOAsyncChannelFactory();
+
+    protected Channel openChannel(URI connectURI) throws IOException {
+        return factory.openAsyncChannel(connectURI);
+    }
+
+    protected ChannelServer bindChannel() throws IOException, URISyntaxException {
+        return factory.bindAsyncChannel(new URI("tcp://localhost:0"));
+    }
+
+    protected boolean isDisabled() {
+        return disabled;
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java (original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java Tue Feb 21 15:12:56 2006
@@ -1,111 +1,111 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.aio;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import junit.framework.TestCase;
-
-import org.apache.activeio.AcceptListener;
-import org.apache.activeio.Channel;
-import org.apache.activeio.ChannelFactory;
-import org.apache.activeio.adapter.AsyncToSyncChannel;
-import org.apache.activeio.adapter.SyncToAsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.async.aio.AIOAsyncChannel;
-import org.apache.activeio.packet.async.aio.AIOSyncChannelServer;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-
-/**
- */
-public class ChannelFactoryTest extends TestCase {
-
-    static final Log log = LogFactory.getLog(ChannelFactoryTest.class);
-    static boolean aioDisabled = System.getProperty("disable.aio.tests", "false").equals("true");
-
-    ChannelFactory factory = new ChannelFactory();
-
-    private SyncChannelServer syncChannelServer;
-    private SyncChannel clientSynchChannel;
-    private SyncChannel serverSynchChannel;
-
-    private AsyncChannelServer asyncChannelServer;
-    private AsyncChannel clientAsyncChannel;
-    private AsyncChannel serverAsyncChannel;
-    
-    protected void setUp() throws Exception {
-        log.info("Running: "+getName());
-    }
-
-    public void testAIO() throws IOException, URISyntaxException, InterruptedException {
-
-        if( aioDisabled ) {
-            return;
-        }
-        
-        createSynchObjects("aio://localhost:0");
-        assertNotNull( syncChannelServer.getAdapter(AIOSyncChannelServer.class) );
-        assertNotNull( clientSynchChannel.getAdapter(AIOAsyncChannel.class) );
-        assertNotNull( serverSynchChannel.getAdapter(AIOAsyncChannel.class) );
-        
-        createAsynchObjects("aio://localhost:0");
-        assertNotNull( asyncChannelServer.getAdapter(AIOSyncChannelServer.class) );
-        assertNotNull( clientAsyncChannel.getAdapter(AIOAsyncChannel.class) );
-        assertNotNull( serverAsyncChannel.getAdapter(AIOAsyncChannel.class) );
-        
-    }    
-
-    private void createSynchObjects(String bindURI) throws IOException, URISyntaxException {
-        syncChannelServer = factory.bindSyncChannel(new URI(bindURI));
-        syncChannelServer.start();
-        clientSynchChannel = factory.openSyncChannel(syncChannelServer.getConnectURI());
-        serverSynchChannel = AsyncToSyncChannel.adapt( syncChannelServer.accept(1000*5) );
-        serverSynchChannel.dispose();        
-        clientSynchChannel.dispose();        
-        syncChannelServer.dispose();
-    }
-
-    private void createAsynchObjects(String bindURI) throws IOException, URISyntaxException, InterruptedException {
-        asyncChannelServer = factory.bindAsyncChannel(new URI(bindURI));
-        final CountDownLatch accepted = new CountDownLatch(1);
-        asyncChannelServer.setAcceptListener(new AcceptListener() {
-            public void onAccept(Channel channel) {
-                serverAsyncChannel = SyncToAsyncChannel.adapt(channel);
-                channel.dispose();
-                accepted.countDown();
-            }
-            public void onAcceptError(IOException error) {
-                error.printStackTrace();
-            }
-        });
-        asyncChannelServer.start();
-        clientAsyncChannel = factory.openAsyncChannel(asyncChannelServer.getConnectURI());
-        accepted.await(1000*10, TimeUnit.MILLISECONDS);
-        clientAsyncChannel.dispose();        
-        asyncChannelServer.dispose();
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import junit.framework.TestCase;
+
+import org.apache.activeio.AcceptListener;
+import org.apache.activeio.Channel;
+import org.apache.activeio.ChannelFactory;
+import org.apache.activeio.adapter.AsyncToSyncChannel;
+import org.apache.activeio.adapter.SyncToAsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.async.aio.AIOAsyncChannel;
+import org.apache.activeio.packet.async.aio.AIOSyncChannelServer;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class ChannelFactoryTest extends TestCase {
+
+    static final Log log = LogFactory.getLog(ChannelFactoryTest.class);
+    static boolean aioDisabled = System.getProperty("disable.aio.tests", "false").equals("true");
+
+    ChannelFactory factory = new ChannelFactory();
+
+    private SyncChannelServer syncChannelServer;
+    private SyncChannel clientSynchChannel;
+    private SyncChannel serverSynchChannel;
+
+    private AsyncChannelServer asyncChannelServer;
+    private AsyncChannel clientAsyncChannel;
+    private AsyncChannel serverAsyncChannel;
+    
+    protected void setUp() throws Exception {
+        log.info("Running: "+getName());
+    }
+
+    public void testAIO() throws IOException, URISyntaxException, InterruptedException {
+
+        if( aioDisabled ) {
+            return;
+        }
+        
+        createSynchObjects("aio://localhost:0");
+        assertNotNull( syncChannelServer.getAdapter(AIOSyncChannelServer.class) );
+        assertNotNull( clientSynchChannel.getAdapter(AIOAsyncChannel.class) );
+        assertNotNull( serverSynchChannel.getAdapter(AIOAsyncChannel.class) );
+        
+        createAsynchObjects("aio://localhost:0");
+        assertNotNull( asyncChannelServer.getAdapter(AIOSyncChannelServer.class) );
+        assertNotNull( clientAsyncChannel.getAdapter(AIOAsyncChannel.class) );
+        assertNotNull( serverAsyncChannel.getAdapter(AIOAsyncChannel.class) );
+        
+    }    
+
+    private void createSynchObjects(String bindURI) throws IOException, URISyntaxException {
+        syncChannelServer = factory.bindSyncChannel(new URI(bindURI));
+        syncChannelServer.start();
+        clientSynchChannel = factory.openSyncChannel(syncChannelServer.getConnectURI());
+        serverSynchChannel = AsyncToSyncChannel.adapt( syncChannelServer.accept(1000*5) );
+        serverSynchChannel.dispose();        
+        clientSynchChannel.dispose();        
+        syncChannelServer.dispose();
+    }
+
+    private void createAsynchObjects(String bindURI) throws IOException, URISyntaxException, InterruptedException {
+        asyncChannelServer = factory.bindAsyncChannel(new URI(bindURI));
+        final CountDownLatch accepted = new CountDownLatch(1);
+        asyncChannelServer.setAcceptListener(new AcceptListener() {
+            public void onAccept(Channel channel) {
+                serverAsyncChannel = SyncToAsyncChannel.adapt(channel);
+                channel.dispose();
+                accepted.countDown();
+            }
+            public void onAcceptError(IOException error) {
+                error.printStackTrace();
+            }
+        });
+        asyncChannelServer.start();
+        clientAsyncChannel = factory.openAsyncChannel(asyncChannelServer.getConnectURI());
+        accepted.await(1000*10, TimeUnit.MILLISECONDS);
+        clientAsyncChannel.dispose();        
+        asyncChannelServer.dispose();
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/AcceptListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/AcceptListener.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/AcceptListener.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/AcceptListener.java Tue Feb 21 15:12:56 2006
@@ -1,44 +1,44 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio;
-
-import java.io.IOException;
-
-
-/**
- * An AcceptListener object is used to receive accepted {@see org.apache.activeio.Channel} connections.
- * 
- * @version $Revision$
- */
-public interface AcceptListener {
-    
-	/**
-	 * A {@see AsyncChannelServer} will call this method to when a new channel connection has been
-	 * accepted. 
-	 *   
-	 * @param channel
-	 */
-	void onAccept(Channel channel);
-
-	/**
-	 * A {@see AsyncChannelServer} will call this method when a async failure occurs when accepting
-	 * a connection. 
-     * 
-     * @param error the exception that describes the failure.
-     */
-    void onAcceptError(IOException error);
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio;
+
+import java.io.IOException;
+
+
+/**
+ * An AcceptListener object is used to receive accepted {@see org.apache.activeio.Channel} connections.
+ * 
+ * @version $Revision$
+ */
+public interface AcceptListener {
+    
+	/**
+	 * A {@see AsyncChannelServer} will call this method to when a new channel connection has been
+	 * accepted. 
+	 *   
+	 * @param channel
+	 */
+	void onAccept(Channel channel);
+
+	/**
+	 * A {@see AsyncChannelServer} will call this method when a async failure occurs when accepting
+	 * a connection. 
+     * 
+     * @param error the exception that describes the failure.
+     */
+    void onAcceptError(IOException error);
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/AcceptListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Adaptable.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Adaptable.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Adaptable.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Adaptable.java Tue Feb 21 15:12:56 2006
@@ -1,35 +1,35 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio;
-
-
-/**
- * Provides an Adaptable interface inspired by eclipse's IAdaptable class.  Highly used in ActiveIO since Channel and Packet
- * implementations may be layered and application code may want request the higher level layers/abstractions to adapt to give access
- * to the lower layer implementation details.
- * 
- * @version $Revision$
- */
-public interface Adaptable {
-    
-    /**
-     *  @Return object that is an instance of requested type and is associated this this object.  May return null if no 
-     *  object of that type is associated.
-     */
-    Object getAdapter(Class target);
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio;
+
+
+/**
+ * Provides an Adaptable interface inspired by eclipse's IAdaptable class.  Highly used in ActiveIO since Channel and Packet
+ * implementations may be layered and application code may want request the higher level layers/abstractions to adapt to give access
+ * to the lower layer implementation details.
+ * 
+ * @version $Revision$
+ */
+public interface Adaptable {
+    
+    /**
+     *  @Return object that is an instance of requested type and is associated this this object.  May return null if no 
+     *  object of that type is associated.
+     */
+    Object getAdapter(Class target);
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Adaptable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Channel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Channel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Channel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Channel.java Tue Feb 21 15:12:56 2006
@@ -1,31 +1,31 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio;
-
-
-/**
- * A Channel provides a standard procedure for regulating data transmission between 
- * applications.
- * 
- * The activeio API encourages that layered wire protocols be created by wiring
- * together a chain of Channel objects.   
- * 
- * @version $Revision$
- */
-public interface Channel extends Service, Adaptable {    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio;
+
+
+/**
+ * A Channel provides a standard procedure for regulating data transmission between 
+ * applications.
+ * 
+ * The activeio API encourages that layered wire protocols be created by wiring
+ * together a chain of Channel objects.   
+ * 
+ * @version $Revision$
+ */
+public interface Channel extends Service, Adaptable {    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/Channel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,144 +1,144 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-
-import org.apache.activeio.adapter.AsyncToSyncChannelFactory;
-import org.apache.activeio.adapter.SyncToAsyncChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelFactory;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-import org.apache.activeio.util.FactoryFinder;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-
-/**
- * A {@see ChannelFactory}uses the requested URI's scheme to determine the
- * actual {@see org.apache.activeio.SynchChannelFactory}or
- * {@see org.apache.activeio.AsyncChannelFactory}implementation to use to create it's
- * {@see org.apache.activeio.Channel}s and {@see org.apache.activeio.ChannelServer}s.
- * 
- * Each URI scheme that {@see ChannelFactory}object handles will have a
- * properties file located at: "META-INF/services/org/apache/activeio/channel/{scheme}".
- * 
- */
-public class ChannelFactory implements SyncChannelFactory, AsyncChannelFactory {
-
-    private final HashMap syncChannelFactoryMap = new HashMap();
-    private final HashMap asyncChannelFactoryMap = new HashMap();
-
-    
-    static public final Executor DEFAULT_EXECUTOR = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue());
-    static {
-        ((ThreadPoolExecutor) DEFAULT_EXECUTOR).setThreadFactory(new ThreadFactory() {
-            public Thread newThread(Runnable run) {
-                Thread thread = new Thread(run);
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
-    }
-
-    private static FactoryFinder finder = new FactoryFinder("META-INF/services/org/apache/activeio/channel/");
-
-    public SyncChannel openSyncChannel(URI location) throws IOException {
-        SyncChannelFactory factory = getSynchChannelFactory(location.getScheme());
-        return factory.openSyncChannel(location);
-    }
-
-    public SyncChannelServer bindSyncChannel(URI location) throws IOException {
-        SyncChannelFactory factory = getSynchChannelFactory(location.getScheme());
-        return factory.bindSyncChannel(location);
-    }
-
-    public AsyncChannel openAsyncChannel(URI location) throws IOException {
-        AsyncChannelFactory factory = getAsyncChannelFactory(location.getScheme());
-        return factory.openAsyncChannel(location);
-    }
-
-    public AsyncChannelServer bindAsyncChannel(URI location) throws IOException {
-        AsyncChannelFactory factory = getAsyncChannelFactory(location.getScheme());
-        return factory.bindAsyncChannel(location);
-    }
-
-    private SyncChannelFactory getSynchChannelFactory(String protocol) throws IOException {
-        try {
-            SyncChannelFactory rc = (SyncChannelFactory) syncChannelFactoryMap.get(protocol);
-            if (rc == null) {
-                try {
-                    rc = (SyncChannelFactory) finder.newInstance(protocol, "SyncChannelFactory.");
-                } catch (Throwable original) {
-                    // try to recovery by using AsyncChannelFactory and adapt
-                    // it to be sync.
-                    try {
-                        AsyncChannelFactory f = (AsyncChannelFactory) finder.newInstance(protocol,
-                                "AsyncChannelFactory.");
-                        rc = AsyncToSyncChannelFactory.adapt(f);
-                    } catch (Throwable e) {
-                        // Recovery strategy failed.. throw original exception.
-                        throw original;
-                    }
-                }
-                syncChannelFactoryMap.put(protocol, rc);
-            }
-            return rc;
-        } catch (Throwable e) {
-            throw (IOException) new IOException("Could not load a SyncChannelFactory for protcol: " + protocol
-                    + ", reason: " + e).initCause(e);
-        }
-    }
-
-    private AsyncChannelFactory getAsyncChannelFactory(String protocol) throws IOException {
-        try {
-            AsyncChannelFactory rc = (AsyncChannelFactory) asyncChannelFactoryMap.get(protocol);
-            if (rc == null) {
-
-                try {
-                    rc = (AsyncChannelFactory) finder.newInstance(protocol, "AsyncChannelFactory.");
-                } catch (Throwable original) {
-                    // try to recovery by using SynchChannelFactory and adapt it
-                    // to be async.
-                    try {
-                        SyncChannelFactory f = (SyncChannelFactory) finder.newInstance(protocol,
-                                "SyncChannelFactory.");
-                        rc = SyncToAsyncChannelFactory.adapt(f);
-                    } catch (Throwable e) {
-                        // Recovery strategy failed.. throw original exception.
-                        throw original;
-                    }
-                }
-
-                asyncChannelFactoryMap.put(protocol, rc);
-            }
-            return rc;
-        } catch (Throwable e) {
-            throw (IOException) new IOException("Could not load a AsyncChannelFactory for protcol: " + protocol
-                    + ", reason: " + e).initCause(e);
-        }
-    }
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+
+import org.apache.activeio.adapter.AsyncToSyncChannelFactory;
+import org.apache.activeio.adapter.SyncToAsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelFactory;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+import org.apache.activeio.util.FactoryFinder;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ * A {@see ChannelFactory}uses the requested URI's scheme to determine the
+ * actual {@see org.apache.activeio.SynchChannelFactory}or
+ * {@see org.apache.activeio.AsyncChannelFactory}implementation to use to create it's
+ * {@see org.apache.activeio.Channel}s and {@see org.apache.activeio.ChannelServer}s.
+ * 
+ * Each URI scheme that {@see ChannelFactory}object handles will have a
+ * properties file located at: "META-INF/services/org/apache/activeio/channel/{scheme}".
+ * 
+ */
+public class ChannelFactory implements SyncChannelFactory, AsyncChannelFactory {
+
+    private final HashMap syncChannelFactoryMap = new HashMap();
+    private final HashMap asyncChannelFactoryMap = new HashMap();
+
+    
+    static public final Executor DEFAULT_EXECUTOR = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue());
+    static {
+        ((ThreadPoolExecutor) DEFAULT_EXECUTOR).setThreadFactory(new ThreadFactory() {
+            public Thread newThread(Runnable run) {
+                Thread thread = new Thread(run);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
+
+    private static FactoryFinder finder = new FactoryFinder("META-INF/services/org/apache/activeio/channel/");
+
+    public SyncChannel openSyncChannel(URI location) throws IOException {
+        SyncChannelFactory factory = getSynchChannelFactory(location.getScheme());
+        return factory.openSyncChannel(location);
+    }
+
+    public SyncChannelServer bindSyncChannel(URI location) throws IOException {
+        SyncChannelFactory factory = getSynchChannelFactory(location.getScheme());
+        return factory.bindSyncChannel(location);
+    }
+
+    public AsyncChannel openAsyncChannel(URI location) throws IOException {
+        AsyncChannelFactory factory = getAsyncChannelFactory(location.getScheme());
+        return factory.openAsyncChannel(location);
+    }
+
+    public AsyncChannelServer bindAsyncChannel(URI location) throws IOException {
+        AsyncChannelFactory factory = getAsyncChannelFactory(location.getScheme());
+        return factory.bindAsyncChannel(location);
+    }
+
+    private SyncChannelFactory getSynchChannelFactory(String protocol) throws IOException {
+        try {
+            SyncChannelFactory rc = (SyncChannelFactory) syncChannelFactoryMap.get(protocol);
+            if (rc == null) {
+                try {
+                    rc = (SyncChannelFactory) finder.newInstance(protocol, "SyncChannelFactory.");
+                } catch (Throwable original) {
+                    // try to recovery by using AsyncChannelFactory and adapt
+                    // it to be sync.
+                    try {
+                        AsyncChannelFactory f = (AsyncChannelFactory) finder.newInstance(protocol,
+                                "AsyncChannelFactory.");
+                        rc = AsyncToSyncChannelFactory.adapt(f);
+                    } catch (Throwable e) {
+                        // Recovery strategy failed.. throw original exception.
+                        throw original;
+                    }
+                }
+                syncChannelFactoryMap.put(protocol, rc);
+            }
+            return rc;
+        } catch (Throwable e) {
+            throw (IOException) new IOException("Could not load a SyncChannelFactory for protcol: " + protocol
+                    + ", reason: " + e).initCause(e);
+        }
+    }
+
+    private AsyncChannelFactory getAsyncChannelFactory(String protocol) throws IOException {
+        try {
+            AsyncChannelFactory rc = (AsyncChannelFactory) asyncChannelFactoryMap.get(protocol);
+            if (rc == null) {
+
+                try {
+                    rc = (AsyncChannelFactory) finder.newInstance(protocol, "AsyncChannelFactory.");
+                } catch (Throwable original) {
+                    // try to recovery by using SynchChannelFactory and adapt it
+                    // to be async.
+                    try {
+                        SyncChannelFactory f = (SyncChannelFactory) finder.newInstance(protocol,
+                                "SyncChannelFactory.");
+                        rc = SyncToAsyncChannelFactory.adapt(f);
+                    } catch (Throwable e) {
+                        // Recovery strategy failed.. throw original exception.
+                        throw original;
+                    }
+                }
+
+                asyncChannelFactoryMap.put(protocol, rc);
+            }
+            return rc;
+        } catch (Throwable e) {
+            throw (IOException) new IOException("Could not load a AsyncChannelFactory for protcol: " + protocol
+                    + ", reason: " + e).initCause(e);
+        }
+    }
+
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,51 +1,51 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio;
-
-import java.net.URI;
-
-/**
- * A ChannelServer is used to accept incoming requests to establish new Channel sessions.
- * 
- * Like a normal {@see org.apache.activeio.Channel}, A ChannelServer comes in two falvors, either:
- * {@see org.apache.activeio.AsyncChannelServer} or 
- * {@see org.apache.activeio.SynchChannelServer}.
- * 
- * @version $Revision$
- */
-public interface ChannelServer extends Service, Adaptable {
-
-    /**
-     * The URI that was used when the channel was bound.  This could be different
-     * than what is used by a client to connect to the ChannelServer.  For example,
-     * the bind URI might be tcp://localhost:0 which means the channel should bind to 
-     * an anonymous port.
-     * 
-     * @return The URI that was used when the channel was bound
-     */
-    public URI getBindURI();
-    
-    /**
-     * Once bound, the channel may be able to construct a URI that is more sutible for when 
-     * a client needs to connect to the server.  For examle the port of the URI may be 
-     * updated to reflect the actual local port that the channel server is listening on.
-     * 
-     * @return a URI that a client can use to connect to the server or null if the channel cannot construct the URI.
-     */
-    public URI getConnectURI();
-    
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio;
+
+import java.net.URI;
+
+/**
+ * A ChannelServer is used to accept incoming requests to establish new Channel sessions.
+ * 
+ * Like a normal {@see org.apache.activeio.Channel}, A ChannelServer comes in two falvors, either:
+ * {@see org.apache.activeio.AsyncChannelServer} or 
+ * {@see org.apache.activeio.SynchChannelServer}.
+ * 
+ * @version $Revision$
+ */
+public interface ChannelServer extends Service, Adaptable {
+
+    /**
+     * The URI that was used when the channel was bound.  This could be different
+     * than what is used by a client to connect to the ChannelServer.  For example,
+     * the bind URI might be tcp://localhost:0 which means the channel should bind to 
+     * an anonymous port.
+     * 
+     * @return The URI that was used when the channel was bound
+     */
+    public URI getBindURI();
+    
+    /**
+     * Once bound, the channel may be able to construct a URI that is more sutible for when 
+     * a client needs to connect to the server.  For examle the port of the URI may be 
+     * updated to reflect the actual local port that the channel server is listening on.
+     * 
+     * @return a URI that a client can use to connect to the server or null if the channel cannot construct the URI.
+     */
+    public URI getConnectURI();
+    
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/ChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message