activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [10/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ acti...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelSelectorManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelSelectorManager.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelSelectorManager.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelSelectorManager.java Tue Feb 21 15:12:56 2006
@@ -1,254 +1,254 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.nio;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.apache.activeio.ChannelFactory;
-
-import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Set;
-
-/**
- * The SelectorManager will manage one Selector and the thread that checks the
- * selector.
- * 
- * We may need to consider running more than one thread to check the selector if
- * servicing the selector takes too long.
- * 
- * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
- */
-final public class NIOAsyncChannelSelectorManager {
-
-    static private Executor selectorExecutor = ChannelFactory.DEFAULT_EXECUTOR;
-    static private Executor channelExecutor = ChannelFactory.DEFAULT_EXECUTOR;
-    
-    static private LinkedList freeManagers = new LinkedList();
-    static private LinkedList fullManagers = new LinkedList();
-    private static final int MAX_CHANNELS_PER_SELECTOR  = 50;
-    
-    static {
-       String os = System.getProperty("os.name");
-       if( os.startsWith("Linux") ) {
-           channelExecutor = new ScheduledThreadPoolExecutor(1);
-       }
-    } 
-
-    public static interface SelectorManagerListener {
-        public void onSelect(SocketChannelAsyncChannelSelection selector);
-    }
-
-    final public class SocketChannelAsyncChannelSelection {
-        
-        private final SelectionKey key;
-        private final SelectorManagerListener listener;
-        private boolean closed;
-        private int interest;
-
-        private SocketChannelAsyncChannelSelection(SocketChannel socketChannel, SelectorManagerListener listener)
-                throws ClosedChannelException {
-            this.listener = listener;
-            this.key = socketChannel.register(selector, 0, this);
-            incrementUseCounter();
-        }
-
-        public void setInterestOps(int ops) {
-            	if( closed ) 
-            		return;
-            	interest = ops;
-             enable();
-        }
-        
-        public void enable() {
-            if( closed ) 
-                return;
-            key.interestOps(interest);
-            selector.wakeup();
-        }
-
-        public void disable() {
-            if( closed ) 
-                return;
-            key.interestOps(0);
-        }
-
-        public void close() {
-        	if( closed ) 
-        		return;
-        	
-            key.cancel();
-            decrementUseCounter();
-            selector.wakeup();
-            closed=true;
-        }
-        
-        public void onSelect() {
-            if( !key.isValid() )
-                return;
-            listener.onSelect(this);
-        }
-
-        public boolean isWritable() {
-            return key.isWritable();
-        }
-
-        public boolean isReadable() {
-            return key.isReadable();
-        }
-    }
-
-    public synchronized static SocketChannelAsyncChannelSelection register(
-            SocketChannel socketChannel, SelectorManagerListener listener)
-            throws IOException {
-
-        NIOAsyncChannelSelectorManager manager = null;
-        synchronized (freeManagers) {
-            if (freeManagers.size() > 0)
-                manager = (NIOAsyncChannelSelectorManager) freeManagers.getFirst();
-            if (manager == null) {
-                manager = new NIOAsyncChannelSelectorManager();
-                freeManagers.addFirst(manager);
-            }
-
-            // That manager may have filled up.
-            SocketChannelAsyncChannelSelection selection = manager.new SocketChannelAsyncChannelSelection(
-                    socketChannel, listener);
-            if (manager.useCounter >= MAX_CHANNELS_PER_SELECTOR) {
-                freeManagers.removeFirst();
-                fullManagers.addLast(manager);
-            }
-            return selection;
-        }
-    }
-
-    public synchronized static void setSelectorExecutor(Executor executor) {
-        NIOAsyncChannelSelectorManager.selectorExecutor = executor;
-    }
-    
-    public synchronized static void setChannelExecutor(Executor executor) {
-        NIOAsyncChannelSelectorManager.channelExecutor = executor;
-    }
-
-    private class SelectorWorker implements Runnable {
-                
-        public void run() {            
-            
-            String origName = Thread.currentThread().getName();
-            try {
-               Thread.currentThread().setName("Selector Worker: "+getId());
-               while ( isRunning() ) {
-
-                   int count = selector.select(10);
-                   if (count == 0)
-                       continue;                
-                    if( !isRunning() )
-                        return;
-
-                    // Get a java.util.Set containing the SelectionKey objects
-                    // for all channels that are ready for I/O.
-                    Set keys = selector.selectedKeys();
-    
-                    for (Iterator i = keys.iterator(); i.hasNext();) {                        
-                        final SelectionKey key = (SelectionKey) i.next();
-                        i.remove();
-
-                        if( !key.isValid() ) 
-                            continue;
-                        
-                        final SocketChannelAsyncChannelSelection s = (SocketChannelAsyncChannelSelection) key.attachment();
-                        s.disable();
-                        
-                        // Kick off another thread to find newly selected keys while we process the 
-                        // currently selected keys                
-                        channelExecutor.execute(new Runnable() {
-                            public void run() {
-                                try {
-                                    s.onSelect();
-                                    s.enable();
-                                } catch ( Throwable e ) {
-                                    System.err.println("ActiveIO unexpected error: ");
-                                    e.printStackTrace(System.err);
-                                }
-                            }
-                        });
-                    }
-                    
-               }
-            } catch (Throwable e) {
-                System.err.println("Unexpected exception: " + e);
-                e.printStackTrace();
-            } finally {
-                Thread.currentThread().setName(origName);
-            }
-        }
-    }
-
-    /**
-     * The selector used to wait for non-blocking events.
-     */
-    private Selector selector;
-
-    /**
-     * How many SelectionKeys does the selector have active.
-     */
-    private int useCounter;
-    private int id = getNextId();
-    private static int nextId;
-
-    private NIOAsyncChannelSelectorManager() throws IOException {
-        selector = Selector.open();
-    }
-    
-    synchronized private static int getNextId() {
-        return nextId++;
-    }
-
-    private int getId() {
-        return id ;
-    }
-
-    synchronized private void incrementUseCounter() {
-        useCounter++;
-        if (useCounter == 1) {
-            selectorExecutor.execute(new SelectorWorker());
-        }
-    }
-
-    synchronized private void decrementUseCounter() {
-        useCounter--;
-	 	synchronized(freeManagers) {	   	 		 
- 	 		 if( useCounter == 0 ) {
-  	 		 	freeManagers.remove(this);
-  	 		 }    	 		 
- 	 		 else if( useCounter < MAX_CHANNELS_PER_SELECTOR ) {
-  	 		 	fullManagers.remove(this);
-  	 		 	freeManagers.addLast(this);
-  	 		 }     	 		 
-	    }
-    }
-
-    synchronized private boolean isRunning() {
-        return useCounter > 0;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.activeio.ChannelFactory;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+
+/**
+ * The SelectorManager will manage one Selector and the thread that checks the
+ * selector.
+ * 
+ * We may need to consider running more than one thread to check the selector if
+ * servicing the selector takes too long.
+ * 
+ * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
+ */
+final public class NIOAsyncChannelSelectorManager {
+
+    static private Executor selectorExecutor = ChannelFactory.DEFAULT_EXECUTOR;
+    static private Executor channelExecutor = ChannelFactory.DEFAULT_EXECUTOR;
+    
+    static private LinkedList freeManagers = new LinkedList();
+    static private LinkedList fullManagers = new LinkedList();
+    private static final int MAX_CHANNELS_PER_SELECTOR  = 50;
+    
+    static {
+       String os = System.getProperty("os.name");
+       if( os.startsWith("Linux") ) {
+           channelExecutor = new ScheduledThreadPoolExecutor(1);
+       }
+    } 
+
+    public static interface SelectorManagerListener {
+        public void onSelect(SocketChannelAsyncChannelSelection selector);
+    }
+
+    final public class SocketChannelAsyncChannelSelection {
+        
+        private final SelectionKey key;
+        private final SelectorManagerListener listener;
+        private boolean closed;
+        private int interest;
+
+        private SocketChannelAsyncChannelSelection(SocketChannel socketChannel, SelectorManagerListener listener)
+                throws ClosedChannelException {
+            this.listener = listener;
+            this.key = socketChannel.register(selector, 0, this);
+            incrementUseCounter();
+        }
+
+        public void setInterestOps(int ops) {
+            	if( closed ) 
+            		return;
+            	interest = ops;
+             enable();
+        }
+        
+        public void enable() {
+            if( closed ) 
+                return;
+            key.interestOps(interest);
+            selector.wakeup();
+        }
+
+        public void disable() {
+            if( closed ) 
+                return;
+            key.interestOps(0);
+        }
+
+        public void close() {
+        	if( closed ) 
+        		return;
+        	
+            key.cancel();
+            decrementUseCounter();
+            selector.wakeup();
+            closed=true;
+        }
+        
+        public void onSelect() {
+            if( !key.isValid() )
+                return;
+            listener.onSelect(this);
+        }
+
+        public boolean isWritable() {
+            return key.isWritable();
+        }
+
+        public boolean isReadable() {
+            return key.isReadable();
+        }
+    }
+
+    public synchronized static SocketChannelAsyncChannelSelection register(
+            SocketChannel socketChannel, SelectorManagerListener listener)
+            throws IOException {
+
+        NIOAsyncChannelSelectorManager manager = null;
+        synchronized (freeManagers) {
+            if (freeManagers.size() > 0)
+                manager = (NIOAsyncChannelSelectorManager) freeManagers.getFirst();
+            if (manager == null) {
+                manager = new NIOAsyncChannelSelectorManager();
+                freeManagers.addFirst(manager);
+            }
+
+            // That manager may have filled up.
+            SocketChannelAsyncChannelSelection selection = manager.new SocketChannelAsyncChannelSelection(
+                    socketChannel, listener);
+            if (manager.useCounter >= MAX_CHANNELS_PER_SELECTOR) {
+                freeManagers.removeFirst();
+                fullManagers.addLast(manager);
+            }
+            return selection;
+        }
+    }
+
+    public synchronized static void setSelectorExecutor(Executor executor) {
+        NIOAsyncChannelSelectorManager.selectorExecutor = executor;
+    }
+    
+    public synchronized static void setChannelExecutor(Executor executor) {
+        NIOAsyncChannelSelectorManager.channelExecutor = executor;
+    }
+
+    private class SelectorWorker implements Runnable {
+                
+        public void run() {            
+            
+            String origName = Thread.currentThread().getName();
+            try {
+               Thread.currentThread().setName("Selector Worker: "+getId());
+               while ( isRunning() ) {
+
+                   int count = selector.select(10);
+                   if (count == 0)
+                       continue;                
+                    if( !isRunning() )
+                        return;
+
+                    // Get a java.util.Set containing the SelectionKey objects
+                    // for all channels that are ready for I/O.
+                    Set keys = selector.selectedKeys();
+    
+                    for (Iterator i = keys.iterator(); i.hasNext();) {                        
+                        final SelectionKey key = (SelectionKey) i.next();
+                        i.remove();
+
+                        if( !key.isValid() ) 
+                            continue;
+                        
+                        final SocketChannelAsyncChannelSelection s = (SocketChannelAsyncChannelSelection) key.attachment();
+                        s.disable();
+                        
+                        // Kick off another thread to find newly selected keys while we process the 
+                        // currently selected keys                
+                        channelExecutor.execute(new Runnable() {
+                            public void run() {
+                                try {
+                                    s.onSelect();
+                                    s.enable();
+                                } catch ( Throwable e ) {
+                                    System.err.println("ActiveIO unexpected error: ");
+                                    e.printStackTrace(System.err);
+                                }
+                            }
+                        });
+                    }
+                    
+               }
+            } catch (Throwable e) {
+                System.err.println("Unexpected exception: " + e);
+                e.printStackTrace();
+            } finally {
+                Thread.currentThread().setName(origName);
+            }
+        }
+    }
+
+    /**
+     * The selector used to wait for non-blocking events.
+     */
+    private Selector selector;
+
+    /**
+     * How many SelectionKeys does the selector have active.
+     */
+    private int useCounter;
+    private int id = getNextId();
+    private static int nextId;
+
+    private NIOAsyncChannelSelectorManager() throws IOException {
+        selector = Selector.open();
+    }
+    
+    synchronized private static int getNextId() {
+        return nextId++;
+    }
+
+    private int getId() {
+        return id ;
+    }
+
+    synchronized private void incrementUseCounter() {
+        useCounter++;
+        if (useCounter == 1) {
+            selectorExecutor.execute(new SelectorWorker());
+        }
+    }
+
+    synchronized private void decrementUseCounter() {
+        useCounter--;
+	 	synchronized(freeManagers) {	   	 		 
+ 	 		 if( useCounter == 0 ) {
+  	 		 	freeManagers.remove(this);
+  	 		 }    	 		 
+ 	 		 else if( useCounter < MAX_CHANNELS_PER_SELECTOR ) {
+  	 		 	fullManagers.remove(this);
+  	 		 	freeManagers.addLast(this);
+  	 		 }     	 		 
+	    }
+    }
+
+    synchronized private boolean isRunning() {
+        return useCounter > 0;
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelSelectorManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,55 +1,55 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.nio;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.channels.ServerSocketChannel;
-
-import org.apache.activeio.Channel;
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
-import org.apache.activeio.packet.sync.socket.SocketSyncChannelServer;
-import org.apache.activeio.stream.sync.socket.SocketStreamChannel;
-
-/**
- * A SynchChannelServer that creates
- * {@see org.apache.activeio.net.TcpSynchChannel}objects from accepted
- * tcp socket connections.
- * 
- * @version $Revision$
- */
-public class NIOAsyncChannelServer extends SocketSyncChannelServer {
-
-    private final boolean createWriteBufferedChannels;
-	private final boolean useDirectBuffers;
-
-    public NIOAsyncChannelServer(ServerSocketChannel socketChannel, URI bindURI, URI connectURI, boolean createWriteBufferedChannels, boolean useDirectBuffers) {
-        super(socketChannel.socket(), bindURI, connectURI);
-        this.createWriteBufferedChannels = createWriteBufferedChannels;
-		this.useDirectBuffers = useDirectBuffers;
-    }
-    
-    protected Channel createChannel(SocketStreamChannel c) throws IOException {
-        AsyncChannel channel = new NIOAsyncChannel(c.getSocket().getChannel(), useDirectBuffers);
-        if( createWriteBufferedChannels ) {
-            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(useDirectBuffers), false);
-        }
-        return channel;
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.nio;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.channels.ServerSocketChannel;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.packet.sync.socket.SocketSyncChannelServer;
+import org.apache.activeio.stream.sync.socket.SocketStreamChannel;
+
+/**
+ * A SynchChannelServer that creates
+ * {@see org.apache.activeio.net.TcpSynchChannel}objects from accepted
+ * tcp socket connections.
+ * 
+ * @version $Revision$
+ */
+public class NIOAsyncChannelServer extends SocketSyncChannelServer {
+
+    private final boolean createWriteBufferedChannels;
+	private final boolean useDirectBuffers;
+
+    public NIOAsyncChannelServer(ServerSocketChannel socketChannel, URI bindURI, URI connectURI, boolean createWriteBufferedChannels, boolean useDirectBuffers) {
+        super(socketChannel.socket(), bindURI, connectURI);
+        this.createWriteBufferedChannels = createWriteBufferedChannels;
+		this.useDirectBuffers = useDirectBuffers;
+    }
+    
+    protected Channel createChannel(SocketStreamChannel c) throws IOException {
+        AsyncChannel channel = new NIOAsyncChannel(c.getSocket().getChannel(), useDirectBuffers);
+        if( createWriteBufferedChannels ) {
+            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(useDirectBuffers), false);
+        }
+        return channel;
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,252 +1,252 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.vmpipe;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.URI;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannelListener;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-
-/**
- * 
- * @version $Revision$
- */
-final public class VMPipeAsyncChannelFactory implements AsyncChannelFactory {
-    
-    //
-    // We do all this crazy stuff of looking the server map using System
-    // properties
-    // because this class could be loaded multiple times in different
-    // classloaders.
-    //
-    private static final String SERVER_MAP_LOCATION = VMPipeAsyncChannelFactory.class.getName() + ".SERVER_MAP";
-
-    private static final Map SERVER_MAP;
-    static {
-        Map m = null;
-        m = (Map) System.getProperties().get(SERVER_MAP_LOCATION);
-        if (m == null) {
-            m = Collections.synchronizedMap(new HashMap());
-            System.getProperties().put(SERVER_MAP_LOCATION, m);
-        }
-        SERVER_MAP = m;
-    }
-
-    private final static ClassLoader MY_CLASSLOADER = Packet.class.getClassLoader();
-    
-    
-    /**
-     * Used to marshal calls to a PipeChannel in a different classloader.
-     */
-    static public class ClassloaderAsyncChannelAdapter implements AsyncChannel {
-
-        private final ClassLoader cl;
-        private final Object channel;
-        private final Method writeMethod;
-        private final Method setListenerMethod;
-        private final Class listenerClazz;
-        private final Class packetClazz;
-        private final Object listenerProxy;
-        private final Method duplicateMethod;
-        private final Method startMethod;
-        private final Method stopMethod;
-        private final Method disposeMethod;
-
-        private AsyncChannelListener channelListener;
-
-        public class ListenerProxyHandler implements InvocationHandler {
-            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-                switch (method.getName().length()) {
-                case 8: // onPacket
-                    Object packet = duplicateMethod.invoke(args[0], new Object[]{MY_CLASSLOADER});  
-                    channelListener.onPacket((Packet) packet);
-                    break;
-                case 13: // onPacketError
-                    channelListener.onPacketError((IOException) args[0]);
-                    break;
-                default:
-                    channelListener.onPacketError(new IOException("Unknown proxy method invocation: "+method.getName()));
-                }
-                return null;
-            }
-        }
-
-        public ClassloaderAsyncChannelAdapter(Object channel) throws SecurityException, NoSuchMethodException,
-                ClassNotFoundException {
-            this.channel = channel;
-            Class clazz = channel.getClass();
-            cl = clazz.getClassLoader();
-
-            listenerClazz = cl.loadClass(AsyncChannelListener.class.getName());
-            packetClazz = cl.loadClass(Packet.class.getName());
-            writeMethod = clazz.getMethod("write", new Class[] { packetClazz });
-            startMethod = clazz.getMethod("start", new Class[] { });
-            stopMethod = clazz.getMethod("stop", new Class[] {});
-            disposeMethod = clazz.getMethod("dispose", new Class[] { });
-
-            setListenerMethod = clazz.getMethod("setAsyncChannelListener", new Class[] { listenerClazz });
-            duplicateMethod = packetClazz.getMethod("duplicate", new Class[] { ClassLoader.class });
-
-            ListenerProxyHandler handler = new ListenerProxyHandler();
-            listenerProxy = Proxy.newProxyInstance(cl, new Class[] { listenerClazz }, handler);
-        }
-
-        public void write(Packet packet) throws IOException {
-            callIOExceptionMethod(writeMethod, new Object[] { packet.duplicate(cl) });
-        }
-
-        public void setAsyncChannelListener(AsyncChannelListener channelListener) {
-            this.channelListener = channelListener;
-            callMethod(setListenerMethod, new Object[] { channelListener == null ? null : listenerProxy });
-        }
-
-        public AsyncChannelListener getAsyncChannelListener() {
-            return channelListener;
-        }
-
-        public void dispose() {
-            callMethod(disposeMethod, new Object[] { });
-        }
-
-        public void start() throws IOException {
-            callIOExceptionMethod(startMethod, new Object[] {});
-        }
-
-        public void stop() throws IOException {
-            callIOExceptionMethod(stopMethod, new Object[] {});
-        }
-        
-        private void callMethod(Method method, Object[] args) {
-            try {
-                method.invoke(channel, args);
-            } catch (InvocationTargetException e) {
-                if (e.getTargetException() instanceof RuntimeException) {
-                    throw (RuntimeException) e.getTargetException();
-                }
-                throw new RuntimeException(e.getTargetException());
-            } catch (Throwable e) {
-                throw new RuntimeException("Reflexive invocation failed: " + e, e);
-            }            
-        }
-        
-        private void callIOExceptionMethod(Method method, Object[] args) throws IOException {
-            try {
-                method.invoke(channel, args);
-            } catch (InvocationTargetException e) {
-                if (e.getTargetException() instanceof IOException) {
-                    throw (IOException) e.getTargetException();
-                }
-                if (e.getTargetException() instanceof RuntimeException) {
-                    throw (RuntimeException) e.getTargetException();
-                }
-                throw new RuntimeException(e.getTargetException());
-            } catch (Throwable e) {
-                throw (IOException) new IOException("Reflexive invocation failed: " + e).initCause(e);
-            }            
-        }
-
-        //
-        // The following methods do not need to delegate since they
-        // are implemented as noops in the PipeChannel
-        //
-        public Object getAdapter(Class target) {
-            if (target.isAssignableFrom(getClass())) {
-                return this;
-            }
-            return null;
-        }
-
-        public void flush() throws IOException {
-        }
-
-    }
-
-    private boolean forceRefelection;
-
-    public AsyncChannel openAsyncChannel(URI location) throws IOException {
-
-        Object server = lookupServer(location);
-        if (!forceRefelection && server.getClass() == VMPipeAsyncChannelServer.class) {
-            return ((VMPipeAsyncChannelServer) server).connect();
-        }
-
-        // Asume server is in a different classloader.
-        // Use reflection to connect.
-        try {
-            Method method = server.getClass().getMethod("connect", new Class[] {});
-            Object channel = method.invoke(server, new Object[] {});
-            return new ClassloaderAsyncChannelAdapter(channel);
-        } catch (Throwable e) {
-            throw (IOException) new IOException("Connection could not be established: " + e).initCause(e);
-        }
-    }
-
-    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
-        VMPipeAsyncChannelServer server = new VMPipeAsyncChannelServer(bindURI);
-        bindServer(bindURI, server);
-        return server;
-    }
-
-    private static Map getServerMap() {
-        return SERVER_MAP;
-    }
-
-    static public String getServerKeyForURI(URI location) {
-        return location.getHost();
-    }
-
-    public static void bindServer(URI bindURI, VMPipeAsyncChannelServer server) throws IOException {
-        String key = getServerKeyForURI(bindURI);
-        if (getServerMap().get(key) != null)
-            throw new IOException("Server is allready bound at: " + bindURI);
-        getServerMap().put(key, server);
-    }
-
-    public static Object lookupServer(URI location) throws IOException {
-        String key = getServerKeyForURI(location);
-        Object server = getServerMap().get(key);
-        if (server == null) {
-            throw new IOException("Connection refused.");
-        }
-        return server;
-    }
-
-    public static void unbindServer(URI bindURI) {
-        String key = getServerKeyForURI(bindURI);
-        getServerMap().remove(key);
-    }
-
-    public boolean isForceRefelection() {
-        return forceRefelection;
-    }
-    
-    public void setForceRefelection(boolean forceRefelection) {
-        this.forceRefelection = forceRefelection;
-    }
-    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.vmpipe;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+
+/**
+ * 
+ * @version $Revision$
+ */
+final public class VMPipeAsyncChannelFactory implements AsyncChannelFactory {
+    
+    //
+    // We do all this crazy stuff of looking the server map using System
+    // properties
+    // because this class could be loaded multiple times in different
+    // classloaders.
+    //
+    private static final String SERVER_MAP_LOCATION = VMPipeAsyncChannelFactory.class.getName() + ".SERVER_MAP";
+
+    private static final Map SERVER_MAP;
+    static {
+        Map m = null;
+        m = (Map) System.getProperties().get(SERVER_MAP_LOCATION);
+        if (m == null) {
+            m = Collections.synchronizedMap(new HashMap());
+            System.getProperties().put(SERVER_MAP_LOCATION, m);
+        }
+        SERVER_MAP = m;
+    }
+
+    private final static ClassLoader MY_CLASSLOADER = Packet.class.getClassLoader();
+    
+    
+    /**
+     * Used to marshal calls to a PipeChannel in a different classloader.
+     */
+    static public class ClassloaderAsyncChannelAdapter implements AsyncChannel {
+
+        private final ClassLoader cl;
+        private final Object channel;
+        private final Method writeMethod;
+        private final Method setListenerMethod;
+        private final Class listenerClazz;
+        private final Class packetClazz;
+        private final Object listenerProxy;
+        private final Method duplicateMethod;
+        private final Method startMethod;
+        private final Method stopMethod;
+        private final Method disposeMethod;
+
+        private AsyncChannelListener channelListener;
+
+        public class ListenerProxyHandler implements InvocationHandler {
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                switch (method.getName().length()) {
+                case 8: // onPacket
+                    Object packet = duplicateMethod.invoke(args[0], new Object[]{MY_CLASSLOADER});  
+                    channelListener.onPacket((Packet) packet);
+                    break;
+                case 13: // onPacketError
+                    channelListener.onPacketError((IOException) args[0]);
+                    break;
+                default:
+                    channelListener.onPacketError(new IOException("Unknown proxy method invocation: "+method.getName()));
+                }
+                return null;
+            }
+        }
+
+        public ClassloaderAsyncChannelAdapter(Object channel) throws SecurityException, NoSuchMethodException,
+                ClassNotFoundException {
+            this.channel = channel;
+            Class clazz = channel.getClass();
+            cl = clazz.getClassLoader();
+
+            listenerClazz = cl.loadClass(AsyncChannelListener.class.getName());
+            packetClazz = cl.loadClass(Packet.class.getName());
+            writeMethod = clazz.getMethod("write", new Class[] { packetClazz });
+            startMethod = clazz.getMethod("start", new Class[] { });
+            stopMethod = clazz.getMethod("stop", new Class[] {});
+            disposeMethod = clazz.getMethod("dispose", new Class[] { });
+
+            setListenerMethod = clazz.getMethod("setAsyncChannelListener", new Class[] { listenerClazz });
+            duplicateMethod = packetClazz.getMethod("duplicate", new Class[] { ClassLoader.class });
+
+            ListenerProxyHandler handler = new ListenerProxyHandler();
+            listenerProxy = Proxy.newProxyInstance(cl, new Class[] { listenerClazz }, handler);
+        }
+
+        public void write(Packet packet) throws IOException {
+            callIOExceptionMethod(writeMethod, new Object[] { packet.duplicate(cl) });
+        }
+
+        public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+            this.channelListener = channelListener;
+            callMethod(setListenerMethod, new Object[] { channelListener == null ? null : listenerProxy });
+        }
+
+        public AsyncChannelListener getAsyncChannelListener() {
+            return channelListener;
+        }
+
+        public void dispose() {
+            callMethod(disposeMethod, new Object[] { });
+        }
+
+        public void start() throws IOException {
+            callIOExceptionMethod(startMethod, new Object[] {});
+        }
+
+        public void stop() throws IOException {
+            callIOExceptionMethod(stopMethod, new Object[] {});
+        }
+        
+        private void callMethod(Method method, Object[] args) {
+            try {
+                method.invoke(channel, args);
+            } catch (InvocationTargetException e) {
+                if (e.getTargetException() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getTargetException();
+                }
+                throw new RuntimeException(e.getTargetException());
+            } catch (Throwable e) {
+                throw new RuntimeException("Reflexive invocation failed: " + e, e);
+            }            
+        }
+        
+        private void callIOExceptionMethod(Method method, Object[] args) throws IOException {
+            try {
+                method.invoke(channel, args);
+            } catch (InvocationTargetException e) {
+                if (e.getTargetException() instanceof IOException) {
+                    throw (IOException) e.getTargetException();
+                }
+                if (e.getTargetException() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getTargetException();
+                }
+                throw new RuntimeException(e.getTargetException());
+            } catch (Throwable e) {
+                throw (IOException) new IOException("Reflexive invocation failed: " + e).initCause(e);
+            }            
+        }
+
+        //
+        // The following methods do not need to delegate since they
+        // are implemented as noops in the PipeChannel
+        //
+        public Object getAdapter(Class target) {
+            if (target.isAssignableFrom(getClass())) {
+                return this;
+            }
+            return null;
+        }
+
+        public void flush() throws IOException {
+        }
+
+    }
+
+    private boolean forceRefelection;
+
+    public AsyncChannel openAsyncChannel(URI location) throws IOException {
+
+        Object server = lookupServer(location);
+        if (!forceRefelection && server.getClass() == VMPipeAsyncChannelServer.class) {
+            return ((VMPipeAsyncChannelServer) server).connect();
+        }
+
+        // Asume server is in a different classloader.
+        // Use reflection to connect.
+        try {
+            Method method = server.getClass().getMethod("connect", new Class[] {});
+            Object channel = method.invoke(server, new Object[] {});
+            return new ClassloaderAsyncChannelAdapter(channel);
+        } catch (Throwable e) {
+            throw (IOException) new IOException("Connection could not be established: " + e).initCause(e);
+        }
+    }
+
+    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
+        VMPipeAsyncChannelServer server = new VMPipeAsyncChannelServer(bindURI);
+        bindServer(bindURI, server);
+        return server;
+    }
+
+    private static Map getServerMap() {
+        return SERVER_MAP;
+    }
+
+    static public String getServerKeyForURI(URI location) {
+        return location.getHost();
+    }
+
+    public static void bindServer(URI bindURI, VMPipeAsyncChannelServer server) throws IOException {
+        String key = getServerKeyForURI(bindURI);
+        if (getServerMap().get(key) != null)
+            throw new IOException("Server is allready bound at: " + bindURI);
+        getServerMap().put(key, server);
+    }
+
+    public static Object lookupServer(URI location) throws IOException {
+        String key = getServerKeyForURI(location);
+        Object server = getServerMap().get(key);
+        if (server == null) {
+            throw new IOException("Connection refused.");
+        }
+        return server;
+    }
+
+    public static void unbindServer(URI bindURI) {
+        String key = getServerKeyForURI(bindURI);
+        getServerMap().remove(key);
+    }
+
+    public boolean isForceRefelection() {
+        return forceRefelection;
+    }
+    
+    public void setForceRefelection(boolean forceRefelection) {
+        this.forceRefelection = forceRefelection;
+    }
+    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelPipe.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelPipe.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelPipe.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelPipe.java Tue Feb 21 15:12:56 2006
@@ -1,163 +1,163 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.vmpipe;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-
-import org.apache.activeio.packet.EOSPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelListener;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-
-/**
- * Used to connect the bottom ends of two Async channel stacks.
- * 
- */
-final public class VMPipeAsyncChannelPipe {
-
-    final PipeChannel leftChannel = new PipeChannel();
-    final PipeChannel rightChannel = new PipeChannel();
-
-    final public static class PipeChannel implements AsyncChannel {        
-        
-        private PipeChannel sibiling;        
-        private AsyncChannelListener channelListener;
-        private final Semaphore runMutext = new Semaphore(0);
-        private boolean disposed;
-        private boolean running;
-        
-        public PipeChannel() {
-        }
-        
-        public void setAsyncChannelListener(AsyncChannelListener channelListener) {
-            this.channelListener = channelListener;
-        }
-        public AsyncChannelListener getAsyncChannelListener() {
-            return channelListener;
-        }
-        
-        public void write(Packet packet) throws IOException {
-            if( disposed )
-                throw new IOException("Conneciton closed.");
-            sibiling.onPacket(packet, WAIT_FOREVER_TIMEOUT);
-        }
-
-        private void onPacket(Packet packet, long timeout) throws IOException {
-            try {
-                if( timeout == NO_WAIT_TIMEOUT ) {
-                    if( !runMutext.tryAcquire(0, TimeUnit.MILLISECONDS) )
-                        return;
-                } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
-                    runMutext.acquire();
-                } else {
-                    if( !runMutext.tryAcquire(timeout, TimeUnit.MILLISECONDS) ) 
-                        return;
-                }
-            } catch (InterruptedException e) {
-                throw new InterruptedIOException();
-            }
-            try {
-                if( disposed ) {
-                    throw new IOException("Peer connection closed.");
-                }            
-                channelListener.onPacket(packet);
-            } finally {
-                runMutext.release();
-            }
-        }
-
-        public void flush() throws IOException {
-        }
-        
-        public void start() throws IOException {
-            if(running)
-                return;
-            if( channelListener==null )
-                throw new IOException("channelListener has not been set.");
-            running=true;
-            runMutext.release();
-        }
-        
-        public void stop() throws IOException {
-            if(!running)
-                return;            
-            try {
-                runMutext.tryAcquire(5, TimeUnit.SECONDS); 
-                running=false;
-            } catch (InterruptedException e) {
-                throw new InterruptedIOException();
-            }
-        }
-        
-        public void dispose() {
-            if( disposed )
-                return;
-            
-            if( running && channelListener!=null ) {
-                this.channelListener.onPacketError(new IOException("Pipe closed."));
-                running=false;
-            }
-            disposed = true;
-            runMutext.release();
-            
-            try {
-                // Inform the peer of the End Of Stream if he's listening.
-                sibiling.onPacket(EOSPacket.EOS_PACKET, NO_WAIT_TIMEOUT);
-            } catch (IOException e) {
-            }
-        }
-        
-        public PipeChannel getSibiling() {
-            return sibiling;
-        }
-        public void setSibiling(PipeChannel sibiling) {
-            this.sibiling = sibiling;
-        }
-
-        public Object getAdapter(Class target) {
-            if( target.isAssignableFrom(getClass()) ) {
-                return this;
-            }
-            return null;
-        }
-        
-        public String getId() {
-            return "0x"+Integer.toHexString(System.identityHashCode(this));
-        }
-        
-        public String toString() {
-            return "Pipe Channel from "+getId()+" to "+sibiling.getId();
-        }        
-    }
-    
-    public VMPipeAsyncChannelPipe() {
-        leftChannel.setSibiling(rightChannel);
-        rightChannel.setSibiling(leftChannel);
-    }
-    
-    public AsyncChannel getLeftAsyncChannel() {
-        return leftChannel;
-    }
-
-    public AsyncChannel getRightAsyncChannel() {
-        return rightChannel;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.vmpipe;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ * Used to connect the bottom ends of two Async channel stacks.
+ * 
+ */
+final public class VMPipeAsyncChannelPipe {
+
+    final PipeChannel leftChannel = new PipeChannel();
+    final PipeChannel rightChannel = new PipeChannel();
+
+    final public static class PipeChannel implements AsyncChannel {        
+        
+        private PipeChannel sibiling;        
+        private AsyncChannelListener channelListener;
+        private final Semaphore runMutext = new Semaphore(0);
+        private boolean disposed;
+        private boolean running;
+        
+        public PipeChannel() {
+        }
+        
+        public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+            this.channelListener = channelListener;
+        }
+        public AsyncChannelListener getAsyncChannelListener() {
+            return channelListener;
+        }
+        
+        public void write(Packet packet) throws IOException {
+            if( disposed )
+                throw new IOException("Conneciton closed.");
+            sibiling.onPacket(packet, WAIT_FOREVER_TIMEOUT);
+        }
+
+        private void onPacket(Packet packet, long timeout) throws IOException {
+            try {
+                if( timeout == NO_WAIT_TIMEOUT ) {
+                    if( !runMutext.tryAcquire(0, TimeUnit.MILLISECONDS) )
+                        return;
+                } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
+                    runMutext.acquire();
+                } else {
+                    if( !runMutext.tryAcquire(timeout, TimeUnit.MILLISECONDS) ) 
+                        return;
+                }
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+            try {
+                if( disposed ) {
+                    throw new IOException("Peer connection closed.");
+                }            
+                channelListener.onPacket(packet);
+            } finally {
+                runMutext.release();
+            }
+        }
+
+        public void flush() throws IOException {
+        }
+        
+        public void start() throws IOException {
+            if(running)
+                return;
+            if( channelListener==null )
+                throw new IOException("channelListener has not been set.");
+            running=true;
+            runMutext.release();
+        }
+        
+        public void stop() throws IOException {
+            if(!running)
+                return;            
+            try {
+                runMutext.tryAcquire(5, TimeUnit.SECONDS); 
+                running=false;
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }
+        
+        public void dispose() {
+            if( disposed )
+                return;
+            
+            if( running && channelListener!=null ) {
+                this.channelListener.onPacketError(new IOException("Pipe closed."));
+                running=false;
+            }
+            disposed = true;
+            runMutext.release();
+            
+            try {
+                // Inform the peer of the End Of Stream if he's listening.
+                sibiling.onPacket(EOSPacket.EOS_PACKET, NO_WAIT_TIMEOUT);
+            } catch (IOException e) {
+            }
+        }
+        
+        public PipeChannel getSibiling() {
+            return sibiling;
+        }
+        public void setSibiling(PipeChannel sibiling) {
+            this.sibiling = sibiling;
+        }
+
+        public Object getAdapter(Class target) {
+            if( target.isAssignableFrom(getClass()) ) {
+                return this;
+            }
+            return null;
+        }
+        
+        public String getId() {
+            return "0x"+Integer.toHexString(System.identityHashCode(this));
+        }
+        
+        public String toString() {
+            return "Pipe Channel from "+getId()+" to "+sibiling.getId();
+        }        
+    }
+    
+    public VMPipeAsyncChannelPipe() {
+        leftChannel.setSibiling(rightChannel);
+        rightChannel.setSibiling(leftChannel);
+    }
+    
+    public AsyncChannel getLeftAsyncChannel() {
+        return leftChannel;
+    }
+
+    public AsyncChannel getRightAsyncChannel() {
+        return rightChannel;
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelPipe.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,85 +1,85 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.vmpipe;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.activeio.AcceptListener;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-
-/**
- * @version $Revision$
- */
-final public class VMPipeAsyncChannelServer implements AsyncChannelServer {
-
-    private final URI bindURI;
-    private final URI connectURI;
-    private AcceptListener acceptListener;
-    private boolean disposed;
-    
-    public VMPipeAsyncChannelServer(URI bindURI) {
-        this.bindURI = this.connectURI = bindURI;
-    }
-
-    public URI getBindURI() {
-        return bindURI;
-    }
-
-    public URI getConnectURI() {
-        return this.connectURI;
-    }
-
-    public void dispose() {
-        if( disposed )
-            return;
-        
-        VMPipeAsyncChannelFactory.unbindServer(bindURI);
-        disposed=true;
-    }
-
-    public void start() throws IOException {
-        if( acceptListener==null )
-            throw new IOException("acceptListener has not been set.");
-    }
-
-    public void stop() {
-    }
-
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-    
-    public String toString() {
-        return "VM Pipe Server: "+getConnectURI();
-    }
-
-    public void setAcceptListener(AcceptListener acceptListener) {
-        this.acceptListener = acceptListener;
-    }
-
-    public AsyncChannel connect() {
-        VMPipeAsyncChannelPipe pipe = new VMPipeAsyncChannelPipe();
-        acceptListener.onAccept(pipe.getRightAsyncChannel());
-        return pipe.getLeftAsyncChannel();
-    }
-    
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.vmpipe;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activeio.AcceptListener;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+
+/**
+ * @version $Revision$
+ */
+final public class VMPipeAsyncChannelServer implements AsyncChannelServer {
+
+    private final URI bindURI;
+    private final URI connectURI;
+    private AcceptListener acceptListener;
+    private boolean disposed;
+    
+    public VMPipeAsyncChannelServer(URI bindURI) {
+        this.bindURI = this.connectURI = bindURI;
+    }
+
+    public URI getBindURI() {
+        return bindURI;
+    }
+
+    public URI getConnectURI() {
+        return this.connectURI;
+    }
+
+    public void dispose() {
+        if( disposed )
+            return;
+        
+        VMPipeAsyncChannelFactory.unbindServer(bindURI);
+        disposed=true;
+    }
+
+    public void start() throws IOException {
+        if( acceptListener==null )
+            throw new IOException("acceptListener has not been set.");
+    }
+
+    public void stop() {
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+    
+    public String toString() {
+        return "VM Pipe Server: "+getConnectURI();
+    }
+
+    public void setAcceptListener(AcceptListener acceptListener) {
+        this.acceptListener = acceptListener;
+    }
+
+    public AsyncChannel connect() {
+        VMPipeAsyncChannelPipe pipe = new VMPipeAsyncChannelPipe();
+        acceptListener.onAccept(pipe.getRightAsyncChannel());
+        return pipe.getLeftAsyncChannel();
+    }
+    
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/vmpipe/VMPipeAsyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,98 +1,98 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-
-
-/**
- * A SynchChannelFilter can be used as a filter another {@see org.apache.activeio.SynchChannel}
- * Most {@see org.apache.activeio.SynchChannel} that are not directly accessing the network will 
- * extends the SynchChannelFilter since they act as a filter between the client and the network.
- *    
- * @version $Revision$
- */
-public class FilterSyncChannel implements SyncChannel {
-
-    private final SyncChannel next;
-
-    public FilterSyncChannel(SyncChannel next) {
-        this.next = next;
-    }
-
-    /**
-     * @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
-     */
-    public void write(Packet packet) throws IOException {
-        next.write(packet);
-    }
-
-    /**
-     * @see org.apache.activeio.Channel#flush()
-     */
-    public void flush() throws IOException {
-        next.flush();
-    }
-
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        next.dispose();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#start()
-     */
-    public void start() throws IOException {
-        next.start();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#stop()
-     */
-    public void stop() throws IOException {
-        next.stop();
-    }
-
-    /**
-     * @return Returns the next.
-     */
-    public SyncChannel getNext() {
-        return next;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.sync.SyncChannel#read(long)
-     */
-    public Packet read(long timeout) throws IOException {
-        return next.read(timeout);
-    }
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return next.getAdapter(target);
-    }    
-    
-    public String toString() {
-        return next.toString();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+
+
+/**
+ * A SynchChannelFilter can be used as a filter another {@see org.apache.activeio.SynchChannel}
+ * Most {@see org.apache.activeio.SynchChannel} that are not directly accessing the network will 
+ * extends the SynchChannelFilter since they act as a filter between the client and the network.
+ *    
+ * @version $Revision$
+ */
+public class FilterSyncChannel implements SyncChannel {
+
+    private final SyncChannel next;
+
+    public FilterSyncChannel(SyncChannel next) {
+        this.next = next;
+    }
+
+    /**
+     * @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
+     */
+    public void write(Packet packet) throws IOException {
+        next.write(packet);
+    }
+
+    /**
+     * @see org.apache.activeio.Channel#flush()
+     */
+    public void flush() throws IOException {
+        next.flush();
+    }
+
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        next.dispose();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#start()
+     */
+    public void start() throws IOException {
+        next.start();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#stop()
+     */
+    public void stop() throws IOException {
+        next.stop();
+    }
+
+    /**
+     * @return Returns the next.
+     */
+    public SyncChannel getNext() {
+        return next;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.sync.SyncChannel#read(long)
+     */
+    public Packet read(long timeout) throws IOException {
+        return next.read(timeout);
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return next.getAdapter(target);
+    }    
+    
+    public String toString() {
+        return next.toString();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,99 +1,99 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.activeio.Channel;
-
-
-/**
- * A SynchChannelFilter can be used as a filter another {@see org.apache.activeio.SynchChannel}
- * Most {@see org.apache.activeio.SynchChannel} that are not directly accessing the network will 
- * extends the SynchChannelFilter since they act as a filter between the client and the network.
- *    
- * @version $Revision$
- */
-public class FilterSyncChannelServer implements SyncChannelServer {
-
-    private final SyncChannelServer next;
-
-    public FilterSyncChannelServer(SyncChannelServer next) {
-        this.next = next;
-    }
-
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        next.dispose();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#start()
-     */
-    public void start() throws IOException {
-        next.start();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#stop()
-     */
-    public void stop() throws IOException {
-        next.stop();
-    }
-
-    /**
-     * @return Returns the next.
-     */
-    public SyncChannelServer getNext() {
-        return next;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.sync.SyncChannelServer#accept(long)
-     */
-    public Channel accept(long timeout) throws IOException {
-        return next.accept(timeout);
-    }
-
-    /**
-     * @see org.apache.activeio.ChannelServer#getBindURI()
-     */
-    public URI getBindURI() {
-        return next.getBindURI();
-    }
-
-    /**
-     * @see org.apache.activeio.ChannelServer#getConnectURI()
-     */
-    public URI getConnectURI() {
-        return next.getConnectURI();
-    }
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return next.getAdapter(target);
-    }    
-
-    public String toString() {
-        return next.toString();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activeio.Channel;
+
+
+/**
+ * A SynchChannelFilter can be used as a filter another {@see org.apache.activeio.SynchChannel}
+ * Most {@see org.apache.activeio.SynchChannel} that are not directly accessing the network will 
+ * extends the SynchChannelFilter since they act as a filter between the client and the network.
+ *    
+ * @version $Revision$
+ */
+public class FilterSyncChannelServer implements SyncChannelServer {
+
+    private final SyncChannelServer next;
+
+    public FilterSyncChannelServer(SyncChannelServer next) {
+        this.next = next;
+    }
+
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        next.dispose();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#start()
+     */
+    public void start() throws IOException {
+        next.start();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#stop()
+     */
+    public void stop() throws IOException {
+        next.stop();
+    }
+
+    /**
+     * @return Returns the next.
+     */
+    public SyncChannelServer getNext() {
+        return next;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.sync.SyncChannelServer#accept(long)
+     */
+    public Channel accept(long timeout) throws IOException {
+        return next.accept(timeout);
+    }
+
+    /**
+     * @see org.apache.activeio.ChannelServer#getBindURI()
+     */
+    public URI getBindURI() {
+        return next.getBindURI();
+    }
+
+    /**
+     * @see org.apache.activeio.ChannelServer#getConnectURI()
+     */
+    public URI getConnectURI() {
+        return next.getConnectURI();
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return next.getAdapter(target);
+    }    
+
+    public String toString() {
+        return next.toString();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/FilterSyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,59 +1,59 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio.packet.sync;
-
-import java.io.IOException;
-
-import org.apache.activeio.Channel;
-import org.apache.activeio.packet.Packet;
-
-
-/**
- * SyncChannel objects allow threaded to synchronously block on the <code>receiveUpPacket</code>
- * method to get 'up' {@see org.apache.activeio.Packet} objects when they arrive.
- * 
- * @version $Revision$
- */
-public interface SyncChannel extends Channel {
-    
-    /**
-     * Used to synchronously receive a packet of information going 'up' the channel.
-     * This method blocks until a packet is received or the operation experiences timeout.
-     * 
-     * @param timeout
-     * @return the packet received or null if the timeout occurred.
-     * @throws IOException
-     */
-    Packet read(long timeout) throws IOException;
-    
-    /**
-     * Sends a packet down the channel towards the media.
-     * 
-     * @param packet
-     * @throws IOException
-     */
-    void write(Packet packet) throws IOException;
-
-    /**
-     * Some channels may buffer data which may be sent down if flush() is called.
-     * 
-     * @throws IOException
-     */
-    void flush() throws IOException;    
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio.packet.sync;
+
+import java.io.IOException;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.packet.Packet;
+
+
+/**
+ * SyncChannel objects allow threaded to synchronously block on the <code>receiveUpPacket</code>
+ * method to get 'up' {@see org.apache.activeio.Packet} objects when they arrive.
+ * 
+ * @version $Revision$
+ */
+public interface SyncChannel extends Channel {
+    
+    /**
+     * Used to synchronously receive a packet of information going 'up' the channel.
+     * This method blocks until a packet is received or the operation experiences timeout.
+     * 
+     * @param timeout
+     * @return the packet received or null if the timeout occurred.
+     * @throws IOException
+     */
+    Packet read(long timeout) throws IOException;
+    
+    /**
+     * Sends a packet down the channel towards the media.
+     * 
+     * @param packet
+     * @throws IOException
+     */
+    void write(Packet packet) throws IOException;
+
+    /**
+     * Some channels may buffer data which may be sent down if flush() is called.
+     * 
+     * @throws IOException
+     */
+    void flush() throws IOException;    
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,46 +1,46 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * SynchChannelFactory objects can create {@see org.apache.activeio.SynchChannel}
- * and {@see org.apache.activeio.SynchChannelServer} objects. 
- * 
- * @version $Revision$
- */
-public interface SyncChannelFactory {
-
-	/**
-     * Opens a connection to server.
-     * 
-     * @param location 
-     * @return
-     */
-	public SyncChannel openSyncChannel(URI location) throws IOException;
-	
-	/**
-     * Binds a server at the URI location.
-     * 
-     * @param location
-     * @return
-     */
-	public SyncChannelServer bindSyncChannel(URI location) throws IOException;
-	
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * SynchChannelFactory objects can create {@see org.apache.activeio.SynchChannel}
+ * and {@see org.apache.activeio.SynchChannelServer} objects. 
+ * 
+ * @version $Revision$
+ */
+public interface SyncChannelFactory {
+
+	/**
+     * Opens a connection to server.
+     * 
+     * @param location 
+     * @return
+     */
+	public SyncChannel openSyncChannel(URI location) throws IOException;
+	
+	/**
+     * Binds a server at the URI location.
+     * 
+     * @param location
+     * @return
+     */
+	public SyncChannelServer bindSyncChannel(URI location) throws IOException;
+	
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,39 +1,39 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync;
-
-import java.io.IOException;
-
-import org.apache.activeio.Channel;
-import org.apache.activeio.ChannelServer;
-
-
-
-/**
- * A SynchChannelServer object provides an <code>accept</code> method to synchronously 
- * accept and create {@see org.apache.activeio.Channel} objects.
- * 
- * @version $Revision$
- */
-public interface SyncChannelServer extends ChannelServer {
-
-    static final public long NO_WAIT_TIMEOUT=0;
-	static final public long WAIT_FOREVER_TIMEOUT=-1;	
-	
-	public Channel accept(long timeout) throws IOException;
-	
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync;
+
+import java.io.IOException;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.ChannelServer;
+
+
+
+/**
+ * A SynchChannelServer object provides an <code>accept</code> method to synchronously 
+ * accept and create {@see org.apache.activeio.Channel} objects.
+ * 
+ * @version $Revision$
+ */
+public interface SyncChannelServer extends ChannelServer {
+
+    static final public long NO_WAIT_TIMEOUT=0;
+	static final public long WAIT_FOREVER_TIMEOUT=-1;	
+	
+	public Channel accept(long timeout) throws IOException;
+	
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/SyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message