activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [9/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ activ...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelListener.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelListener.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelListener.java Tue Feb 21 15:12:56 2006
@@ -1,50 +1,50 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-
-
-/**
- * A ChannelConsumer object is used to receive 'up' {@see org.apache.activeio.Packet} objects.
- * 
- * TODO: describe the threading model so that the implementor of this interface can know if
- * the methods in this interface can block for a long time or not.  I'm thinking that it would
- * be best if these methods are not allowed to block for a long time to encourage SEDA style 
- * processing.
- * 
- * @version $Revision$
- */
-public interface AsyncChannelListener {
-	
-	/**
-	 * A {@see AsyncChannel} will call this method to deliver an 'up' packet to a consumer. 
-	 *   
-	 * @param packet
-	 */
-    void onPacket(Packet packet);
-    
-    /**
-	 * A {@see AsyncChannel} will call this method when a async failure occurs in the channel. 
-     * 
-     * @param error the exception that describes the failure.
-     */
-    void onPacketError(IOException error);
-    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+
+
+/**
+ * A ChannelConsumer object is used to receive 'up' {@see org.apache.activeio.Packet} objects.
+ * 
+ * TODO: describe the threading model so that the implementor of this interface can know if
+ * the methods in this interface can block for a long time or not.  I'm thinking that it would
+ * be best if these methods are not allowed to block for a long time to encourage SEDA style 
+ * processing.
+ * 
+ * @version $Revision$
+ */
+public interface AsyncChannelListener {
+	
+	/**
+	 * A {@see AsyncChannel} will call this method to deliver an 'up' packet to a consumer. 
+	 *   
+	 * @param packet
+	 */
+    void onPacket(Packet packet);
+    
+    /**
+	 * A {@see AsyncChannel} will call this method when a async failure occurs in the channel. 
+     * 
+     * @param error the exception that describes the failure.
+     */
+    void onPacketError(IOException error);
+    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,39 +1,39 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async;
-
-import org.apache.activeio.AcceptListener;
-import org.apache.activeio.ChannelServer;
-
-
-/**
- * AsyncChannelServer objects asynchronously accept and create {@see org.apache.activeio.Channel} objects
- * and then delivers those objects to a {@see org.apache.activeio.AcceptConsumer}.
- * 
- * @version $Revision$
- */
-public interface AsyncChannelServer extends ChannelServer {
-	
-	/**
-	 * Registers an AcceptListener which is notified of accepted channels.
-	 *  
-	 * @param acceptListener
-	 */
-    void setAcceptListener(AcceptListener acceptListener);
-
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async;
+
+import org.apache.activeio.AcceptListener;
+import org.apache.activeio.ChannelServer;
+
+
+/**
+ * AsyncChannelServer objects asynchronously accept and create {@see org.apache.activeio.Channel} objects
+ * and then delivers those objects to a {@see org.apache.activeio.AcceptConsumer}.
+ * 
+ * @version $Revision$
+ */
+public interface AsyncChannelServer extends ChannelServer {
+	
+	/**
+	 * Registers an AcceptListener which is notified of accepted channels.
+	 *  
+	 * @param acceptListener
+	 */
+    void setAcceptListener(AcceptListener acceptListener);
+
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/AsyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,123 +1,123 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-
-
-/**
- * A AsyncChannelFilter can be used as a filter between a {@see org.apache.activeio.AsyncChannel}
- * and it's {@see org.apache.activeio.ChannelConsumer}.  Most {@see org.apache.activeio.AsyncChannel}
- * that are not directly accessing the network will extends the AsyncChannelFilter since they act as a
- * filter between the client and the network.  O 
- * 
- * @version $Revision$
- */
-public class FilterAsyncChannel implements AsyncChannel, AsyncChannelListener {
-
-    final protected AsyncChannel next;
-    protected AsyncChannelListener channelListener;
-
-    public FilterAsyncChannel(AsyncChannel next) {
-        this.next = next;
-    }
-
-    /**
-     */
-    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
-        this.channelListener = channelListener;
-        if (channelListener == null)
-            next.setAsyncChannelListener(null);
-        else
-            next.setAsyncChannelListener(this);
-    }
-
-    public void write(Packet packet) throws IOException {
-        next.write(packet);
-    }
-
-    public void flush() throws IOException {
-        next.flush();
-    }
-
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        next.dispose();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#start()
-     * @throws IOException if the next channel has not been set.
-     */
-    public void start() throws IOException {
-        if( next == null )
-            throw new IOException("The next channel has not been set.");
-        if( channelListener ==null )
-            throw new IOException("The UpPacketListener has not been set.");
-        next.start();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#stop()
-     */
-    public void stop() throws IOException {
-        next.stop();
-    }
-
-    /**
-     * @see org.apache.activeio.packet.async.AsyncChannelListener#onPacket(org.apache.activeio.packet.Packet)
-     */
-    public void onPacket(Packet packet) {
-        channelListener.onPacket(packet);
-    }
-
-    /**
-     * @see org.apache.activeio.packet.async.AsyncChannelListener#onPacketError(org.apache.activeio.ChannelException)
-     */
-    public void onPacketError(IOException error) {
-        channelListener.onPacketError(error);
-    }
-
-    /**
-     * @return Returns the next.
-     */
-    public AsyncChannel getNext() {
-        return next;
-    }
-
-    /**
-     * @return Returns the packetListener.
-     */
-    public AsyncChannelListener getAsyncChannelListener() {
-        return channelListener;
-    }
-
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return next.getAdapter(target);
-    }  
-    
-    public String toString() {
-        return next.toString();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+
+
+/**
+ * A AsyncChannelFilter can be used as a filter between a {@see org.apache.activeio.AsyncChannel}
+ * and it's {@see org.apache.activeio.ChannelConsumer}.  Most {@see org.apache.activeio.AsyncChannel}
+ * that are not directly accessing the network will extends the AsyncChannelFilter since they act as a
+ * filter between the client and the network.  O 
+ * 
+ * @version $Revision$
+ */
+public class FilterAsyncChannel implements AsyncChannel, AsyncChannelListener {
+
+    final protected AsyncChannel next;
+    protected AsyncChannelListener channelListener;
+
+    public FilterAsyncChannel(AsyncChannel next) {
+        this.next = next;
+    }
+
+    /**
+     */
+    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+        this.channelListener = channelListener;
+        if (channelListener == null)
+            next.setAsyncChannelListener(null);
+        else
+            next.setAsyncChannelListener(this);
+    }
+
+    public void write(Packet packet) throws IOException {
+        next.write(packet);
+    }
+
+    public void flush() throws IOException {
+        next.flush();
+    }
+
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        next.dispose();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#start()
+     * @throws IOException if the next channel has not been set.
+     */
+    public void start() throws IOException {
+        if( next == null )
+            throw new IOException("The next channel has not been set.");
+        if( channelListener ==null )
+            throw new IOException("The UpPacketListener has not been set.");
+        next.start();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#stop()
+     */
+    public void stop() throws IOException {
+        next.stop();
+    }
+
+    /**
+     * @see org.apache.activeio.packet.async.AsyncChannelListener#onPacket(org.apache.activeio.packet.Packet)
+     */
+    public void onPacket(Packet packet) {
+        channelListener.onPacket(packet);
+    }
+
+    /**
+     * @see org.apache.activeio.packet.async.AsyncChannelListener#onPacketError(org.apache.activeio.ChannelException)
+     */
+    public void onPacketError(IOException error) {
+        channelListener.onPacketError(error);
+    }
+
+    /**
+     * @return Returns the next.
+     */
+    public AsyncChannel getNext() {
+        return next;
+    }
+
+    /**
+     * @return Returns the packetListener.
+     */
+    public AsyncChannelListener getAsyncChannelListener() {
+        return channelListener;
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return next.getAdapter(target);
+    }  
+    
+    public String toString() {
+        return next.toString();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,104 +1,104 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.activeio.AcceptListener;
-import org.apache.activeio.Channel;
-
-
-/**
- * A AsyncChannelFilter can be used as a filter between a {@see org.apache.activeio.AsyncChannel}
- * and it's {@see org.apache.activeio.ChannelConsumer}.  Most {@see org.apache.activeio.AsyncChannel}
- * that are not directly accessing the network will extends the AsyncChannelFilter since they act as a
- * filter between the client and the network.  O 
- * 
- * @version $Revision$
- */
-public class FilterAsyncChannelServer implements AsyncChannelServer, AcceptListener {
-
-    final protected AsyncChannelServer next;
-    protected AcceptListener acceptListener;
-
-    public FilterAsyncChannelServer(AsyncChannelServer next) {
-        this.next = next;
-        if( next == null )
-            throw new IllegalArgumentException("The next AsyncChannelServer cannot be null.");
-    }
-
-    public void setAcceptListener(AcceptListener acceptListener) {
-        this.acceptListener = acceptListener;
-        if (acceptListener == null)
-            next.setAcceptListener(null);
-        else
-            next.setAcceptListener(this);
-        
-    }
-    
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        next.dispose();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#start()
-     * @throws IOException if the next channel has not been set.
-     */
-    public void start() throws IOException {
-        if( acceptListener ==null )
-            throw new IOException("The AcceptListener has not been set.");
-        next.start();
-    }
-
-    /**
-     * @see org.apache.activeio.Service#stop()
-     */
-    public void stop() throws IOException {
-        next.stop();
-    }
-
-    public void onAccept(Channel channel) {
-        acceptListener.onAccept(channel);
-    }
-
-    public void onAcceptError(IOException error) {
-        acceptListener.onAcceptError(error);
-    }
-
-    public URI getBindURI() {
-        return next.getBindURI();
-    }
-
-    public URI getConnectURI() {
-        return next.getConnectURI();
-    }
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return next.getAdapter(target);
-    }    
-    
-    public String toString() {
-        return next.toString();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activeio.AcceptListener;
+import org.apache.activeio.Channel;
+
+
+/**
+ * A AsyncChannelFilter can be used as a filter between a {@see org.apache.activeio.AsyncChannel}
+ * and it's {@see org.apache.activeio.ChannelConsumer}.  Most {@see org.apache.activeio.AsyncChannel}
+ * that are not directly accessing the network will extends the AsyncChannelFilter since they act as a
+ * filter between the client and the network.  O 
+ * 
+ * @version $Revision$
+ */
+public class FilterAsyncChannelServer implements AsyncChannelServer, AcceptListener {
+
+    final protected AsyncChannelServer next;
+    protected AcceptListener acceptListener;
+
+    public FilterAsyncChannelServer(AsyncChannelServer next) {
+        this.next = next;
+        if( next == null )
+            throw new IllegalArgumentException("The next AsyncChannelServer cannot be null.");
+    }
+
+    public void setAcceptListener(AcceptListener acceptListener) {
+        this.acceptListener = acceptListener;
+        if (acceptListener == null)
+            next.setAcceptListener(null);
+        else
+            next.setAcceptListener(this);
+        
+    }
+    
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        next.dispose();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#start()
+     * @throws IOException if the next channel has not been set.
+     */
+    public void start() throws IOException {
+        if( acceptListener ==null )
+            throw new IOException("The AcceptListener has not been set.");
+        next.start();
+    }
+
+    /**
+     * @see org.apache.activeio.Service#stop()
+     */
+    public void stop() throws IOException {
+        next.stop();
+    }
+
+    public void onAccept(Channel channel) {
+        acceptListener.onAccept(channel);
+    }
+
+    public void onAcceptError(IOException error) {
+        acceptListener.onAcceptError(error);
+    }
+
+    public URI getBindURI() {
+        return next.getBindURI();
+    }
+
+    public URI getConnectURI() {
+        return next.getConnectURI();
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return next.getAdapter(target);
+    }    
+    
+    public String toString() {
+        return next.toString();
+    }
  }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/FilterAsyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/AsyncWriteAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/CounterAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/CounterAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/CounterAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/CounterAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,73 +1,73 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.filter;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.FilterAsyncChannel;
-
-
-/**
- * A CounterAsyncChannel is a simple {@see org.apache.activeio.AsyncChannelFilter} 
- * that counts the number bytes that been sent down and up through the channel.
- * 
- * The {@see org.apache.activeio.counter.CounterAttribueEnum.COUNTER_INBOUND_COUNT}
- * and {@see org.apache.activeio.counter.CounterAttribueEnum.COUNTER_OUTBOUND_COUNT}
- * attributes can be used to find query the channel to get the current inbound and outbound
- * byte counts.
- * 
- * @version $Revision$
- */
-final public class CounterAsyncChannel extends FilterAsyncChannel {
-
-    long inBoundCounter = 0;
-
-    long outBoundCounter = 0;
-
-    /**
-     * @param next
-     */
-    public CounterAsyncChannel(AsyncChannel next) {
-        super(next);
-    }
-
-    /**
-     * @see org.apache.activeio.packet.async.FilterAsyncChannel#onPacket(org.apache.activeio.packet.Packet)
-     */
-    public void onPacket(Packet packet) {
-        inBoundCounter += packet.remaining();
-        super.onPacket(packet);
-    }
-
-    /**
-     * @see org.apache.activeio.packet.async.FilterAsyncChannel#write(org.apache.activeio.packet.Packet)
-     */
-    public void write(Packet packet) throws IOException {
-        outBoundCounter += packet.position();
-        super.write(packet);
-    }
-
-    public long getInBoundCounter() {
-        return inBoundCounter;
-    }
-    
-    public long getOutBoundCounter() {
-        return outBoundCounter;
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.filter;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.FilterAsyncChannel;
+
+
+/**
+ * A CounterAsyncChannel is a simple {@see org.apache.activeio.AsyncChannelFilter} 
+ * that counts the number bytes that been sent down and up through the channel.
+ * 
+ * The {@see org.apache.activeio.counter.CounterAttribueEnum.COUNTER_INBOUND_COUNT}
+ * and {@see org.apache.activeio.counter.CounterAttribueEnum.COUNTER_OUTBOUND_COUNT}
+ * attributes can be used to find query the channel to get the current inbound and outbound
+ * byte counts.
+ * 
+ * @version $Revision$
+ */
+final public class CounterAsyncChannel extends FilterAsyncChannel {
+
+    long inBoundCounter = 0;
+
+    long outBoundCounter = 0;
+
+    /**
+     * @param next
+     */
+    public CounterAsyncChannel(AsyncChannel next) {
+        super(next);
+    }
+
+    /**
+     * @see org.apache.activeio.packet.async.FilterAsyncChannel#onPacket(org.apache.activeio.packet.Packet)
+     */
+    public void onPacket(Packet packet) {
+        inBoundCounter += packet.remaining();
+        super.onPacket(packet);
+    }
+
+    /**
+     * @see org.apache.activeio.packet.async.FilterAsyncChannel#write(org.apache.activeio.packet.Packet)
+     */
+    public void write(Packet packet) throws IOException {
+        outBoundCounter += packet.position();
+        super.write(packet);
+    }
+
+    public long getInBoundCounter() {
+        return inBoundCounter;
+    }
+    
+    public long getOutBoundCounter() {
+        return outBoundCounter;
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/CounterAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/PacketAggregatingAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/PacketAggregatingAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/PacketAggregatingAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/PacketAggregatingAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,65 +1,65 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.filter;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.FilterAsyncChannel;
-import org.apache.activeio.util.PacketAggregator;
-
-/**
- * This PacketAggregatingAsyncChannel can be used when the client is sending a
- * 'record' style packet down the channel stack and needs receiving end to
- * receive the same 'record' packets.
- * 
- * This is very useful since in general, a channel does not grantee that a
- * Packet that is sent down will not be fragmented or combined with other Packet
- * objects.
- * 
- * This {@see org.apache.activeio.AsyncChannel} adds a 4 byte header
- * to each packet that is sent down.
- * 
- * @version $Revision$
- */
-final public class PacketAggregatingAsyncChannel extends FilterAsyncChannel {
-
-    private final PacketAggregator aggregator = new PacketAggregator() {
-        protected void packetAssembled(Packet packet) {
-            getAsyncChannelListener().onPacket(packet);
-        }
-    };
-    
-    public PacketAggregatingAsyncChannel(AsyncChannel next) {
-        super(next);
-    }
-
-    public void onPacket(Packet packet) {
-        try {
-            aggregator.addRawPacket(packet);
-        } catch (IOException e) {
-            getAsyncChannelListener().onPacketError(e);
-        }
-    }    
-    
-    public void write(Packet packet) throws IOException {
-        getNext().write(aggregator.getHeader(packet));
-        getNext().write(packet);
-    }
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.filter;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.FilterAsyncChannel;
+import org.apache.activeio.util.PacketAggregator;
+
+/**
+ * This PacketAggregatingAsyncChannel can be used when the client is sending a
+ * 'record' style packet down the channel stack and needs receiving end to
+ * receive the same 'record' packets.
+ * 
+ * This is very useful since in general, a channel does not grantee that a
+ * Packet that is sent down will not be fragmented or combined with other Packet
+ * objects.
+ * 
+ * This {@see org.apache.activeio.AsyncChannel} adds a 4 byte header
+ * to each packet that is sent down.
+ * 
+ * @version $Revision$
+ */
+final public class PacketAggregatingAsyncChannel extends FilterAsyncChannel {
+
+    private final PacketAggregator aggregator = new PacketAggregator() {
+        protected void packetAssembled(Packet packet) {
+            getAsyncChannelListener().onPacket(packet);
+        }
+    };
+    
+    public PacketAggregatingAsyncChannel(AsyncChannel next) {
+        super(next);
+    }
+
+    public void onPacket(Packet packet) {
+        try {
+            aggregator.addRawPacket(packet);
+        } catch (IOException e) {
+            getAsyncChannelListener().onPacketError(e);
+        }
+    }    
+    
+    public void write(Packet packet) throws IOException {
+        getNext().write(aggregator.getHeader(packet));
+        getNext().write(packet);
+    }
+
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/PacketAggregatingAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/SynchornizedAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/SynchornizedAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/SynchornizedAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/SynchornizedAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,84 +1,84 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.filter;
-
-import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
-import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.FilterAsyncChannel;
-
-import java.io.IOException;
-
-/**
- * Used to synchronize concurrent access to an ASynchChannel.  
- * 
- * Uses a {@see edu.emory.mathcs.backport.java.util.concurrent.Sync} object
- * for write operations.  All other operations such as {@see #stop(long)}
- * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
- * object instance.  It is assumed that the Async message delivery is not 
- * concurrent and therefore does not require synchronization.
- * 
- */
-public class SynchornizedAsyncChannel extends FilterAsyncChannel {
-
-    private final Lock writeLock;
-
-    public SynchornizedAsyncChannel(AsyncChannel next) {
-        this(next, new ReentrantLock());
-    }
-    
-    public SynchornizedAsyncChannel(AsyncChannel next, Lock writeLock) {
-        super(next);
-        this.writeLock = writeLock;
-    }    
-    
-    public void write(Packet packet) throws IOException {
-        writeLock.lock();
-        try {
-            getNext().write(packet);            
-        } finally {
-            writeLock.unlock();
-        }
-    }
-    
-    public void flush() throws IOException {
-        writeLock.lock();
-        try {
-            getNext().flush();            
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    synchronized public Object getAdapter(Class target) {
-        return super.getAdapter(target);
-    }
-
-    synchronized public void start() throws IOException {
-        super.start();
-    }
-
-    synchronized public void stop() throws IOException {
-        super.stop();
-    }
-    
-    public Lock getWriteLock() {
-        return writeLock;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.filter;
+
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.FilterAsyncChannel;
+
+import java.io.IOException;
+
+/**
+ * Used to synchronize concurrent access to an ASynchChannel.  
+ * 
+ * Uses a {@see edu.emory.mathcs.backport.java.util.concurrent.Sync} object
+ * for write operations.  All other operations such as {@see #stop(long)}
+ * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
+ * object instance.  It is assumed that the Async message delivery is not 
+ * concurrent and therefore does not require synchronization.
+ * 
+ */
+public class SynchornizedAsyncChannel extends FilterAsyncChannel {
+
+    private final Lock writeLock;
+
+    public SynchornizedAsyncChannel(AsyncChannel next) {
+        this(next, new ReentrantLock());
+    }
+    
+    public SynchornizedAsyncChannel(AsyncChannel next, Lock writeLock) {
+        super(next);
+        this.writeLock = writeLock;
+    }    
+    
+    public void write(Packet packet) throws IOException {
+        writeLock.lock();
+        try {
+            getNext().write(packet);            
+        } finally {
+            writeLock.unlock();
+        }
+    }
+    
+    public void flush() throws IOException {
+        writeLock.lock();
+        try {
+            getNext().flush();            
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    synchronized public Object getAdapter(Class target) {
+        return super.getAdapter(target);
+    }
+
+    synchronized public void start() throws IOException {
+        super.start();
+    }
+
+    synchronized public void stop() throws IOException {
+        super.stop();
+    }
+    
+    public Lock getWriteLock() {
+        return writeLock;
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/SynchornizedAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/WriteBufferedAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/WriteBufferedAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/WriteBufferedAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/WriteBufferedAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,70 +1,70 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.filter;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.FilterAsyncChannel;
-
-/**
- */
-public class WriteBufferedAsyncChannel extends FilterAsyncChannel {
-
-    private static final int DEFAULT_BUFFER_SIZE = 1024*64;
-    private final Packet buffer;
-    private final boolean enableDirectWrites;
-    
-    public WriteBufferedAsyncChannel(AsyncChannel channel) {
-        this(channel, new ByteArrayPacket(new byte[DEFAULT_BUFFER_SIZE]));
-    }
-    
-    public WriteBufferedAsyncChannel(AsyncChannel channel, Packet buffer) {
-        this(channel, buffer, true);
-    }
-
-    public WriteBufferedAsyncChannel(AsyncChannel channel, Packet buffer, boolean enableDirectWrites) {
-        super(channel);
-        this.buffer = buffer;
-        this.enableDirectWrites = enableDirectWrites;
-    }
-
-    public void write(Packet packet) throws IOException {
-        
-        while( packet.hasRemaining() ) {
-	        packet.read(buffer);
-	        if( !buffer.hasRemaining() ) {
-	            flush();
-	            
-	            // Should we just direct write the rest?
-	            if( enableDirectWrites && packet.remaining() > buffer.capacity()) {
-	                getNext().write(packet);
-	                return;
-	            }
-	        }
-        }
-        
-    }
-    
-    public void flush() throws IOException {
-        buffer.flip();
-        getNext().write(buffer);
-        buffer.clear();
-    }    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.filter;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.FilterAsyncChannel;
+
+/**
+ */
+public class WriteBufferedAsyncChannel extends FilterAsyncChannel {
+
+    private static final int DEFAULT_BUFFER_SIZE = 1024*64;
+    private final Packet buffer;
+    private final boolean enableDirectWrites;
+    
+    public WriteBufferedAsyncChannel(AsyncChannel channel) {
+        this(channel, new ByteArrayPacket(new byte[DEFAULT_BUFFER_SIZE]));
+    }
+    
+    public WriteBufferedAsyncChannel(AsyncChannel channel, Packet buffer) {
+        this(channel, buffer, true);
+    }
+
+    public WriteBufferedAsyncChannel(AsyncChannel channel, Packet buffer, boolean enableDirectWrites) {
+        super(channel);
+        this.buffer = buffer;
+        this.enableDirectWrites = enableDirectWrites;
+    }
+
+    public void write(Packet packet) throws IOException {
+        
+        while( packet.hasRemaining() ) {
+	        packet.read(buffer);
+	        if( !buffer.hasRemaining() ) {
+	            flush();
+	            
+	            // Should we just direct write the rest?
+	            if( enableDirectWrites && packet.remaining() > buffer.capacity()) {
+	                getNext().write(packet);
+	                return;
+	            }
+	        }
+        }
+        
+    }
+    
+    public void flush() throws IOException {
+        buffer.flip();
+        getNext().write(buffer);
+        buffer.clear();
+    }    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/filter/WriteBufferedAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,181 +1,181 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.nio;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.packet.EOSPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelListener;
-import org.apache.activeio.packet.async.nio.NIOAsyncChannelSelectorManager.SelectorManagerListener;
-import org.apache.activeio.packet.async.nio.NIOAsyncChannelSelectorManager.SocketChannelAsyncChannelSelection;
-import org.apache.activeio.packet.sync.nio.NIOBaseChannel;
-
-/**
- * @version $Revision$
- */
-final public class NIOAsyncChannel extends NIOBaseChannel implements AsyncChannel {
-
-    private AsyncChannelListener channelListener;
-    private SocketChannelAsyncChannelSelection selection;
-    private ByteBuffer inputByteBuffer;
-    private boolean running;
-
-    public NIOAsyncChannel(SocketChannel socketChannel, boolean useDirect) throws IOException {
-        super(socketChannel, useDirect);
-
-        socketChannel.configureBlocking(false);                
-        selection = NIOAsyncChannelSelectorManager.register(socketChannel, new SelectorManagerListener(){
-            public void onSelect(SocketChannelAsyncChannelSelection selection) {
-                String origName = Thread.currentThread().getName();
-                if (selection.isReadable())
-                try {
-                    Thread.currentThread().setName(NIOAsyncChannel.this.toString());
-                    serviceRead();
-                 } catch ( Throwable e ) {
-                     System.err.println("ActiveIO unexpected error: ");
-                     e.printStackTrace(System.err);
-                 } finally {
-                     Thread.currentThread().setName(origName);
-                 }
-            }
-        });
-        
-    }
-    
-    private void serviceRead() {
-        try {
-            
-            while( true ) {
-            	
-	            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
-	                inputByteBuffer = allocateBuffer();
-	            }
-	
-	            int size = socketChannel.read(inputByteBuffer);
-	            if( size == -1 ) {
-	                this.channelListener.onPacket( EOSPacket.EOS_PACKET );
-	                selection.close();
-	                break;
-	            }
-
-	            if( size==0 ) {
-	                break;
-	            }
-	            
-	            // Per Mike Spile, some plaforms read 1 byte of data on the first read, and then
-	            // a but load of data on the second read.  Try to load the butload here
-	            if( size == 1 && inputByteBuffer.hasRemaining() ) {
-		            int size2 = socketChannel.read(inputByteBuffer);
-		            if( size2 > 0 )
-		            		size += size2;
-	            }
-	            
-	            ByteBuffer remaining = inputByteBuffer.slice();            
-	            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
-	            this.channelListener.onPacket( data );
-	                        
-	            // Keep the remaining buffer around to fill with data.
-	            inputByteBuffer = remaining;
-	            
-	            if( inputByteBuffer.hasRemaining() )
-	                break;
-            }
-            
-        } catch (IOException e) {
-            this.channelListener.onPacketError(e);
-        }
-    }
-    
-    synchronized public void write(Packet packet) throws IOException {
-        
-    	ByteBuffer data;
-        if( packet.getClass()==ByteBufferPacket.class ) {
-            data = ((ByteBufferPacket)packet).getByteBuffer();            
-        } else {
-        	ByteSequence sequence = packet.asByteSequence();
-        	data = ByteBuffer.wrap(sequence.getData(), sequence.getOffset(), sequence.getLength());
-        }
-
-        long delay=1;
-        while( data.hasRemaining() ) {
-	        
-            // Since the write is non-blocking, all the data may not have been written.
-            int r1 = data.remaining();        
-	        socketChannel.write( data );        
-	        int r2 = data.remaining();
-	        
-	        // We may need to do a little bit of sleeping to avoid a busy loop.
-            // Slow down if no data was written out.. 
-	        if( r2>0 && r1-r2==0 ) {
-	            try {
-                    // Use exponential rollback to increase sleep time.
-                    Thread.sleep(delay);
-                    delay *= 5;
-                    if( delay > 1000*1 ) {
-                        delay = 1000;
-                    }
-                } catch (InterruptedException e) {
-                    throw new InterruptedIOException();
-                }
-	        } else {
-	            delay = 1;
-	        }
-        }
-    }
-
-    public void flush() throws IOException {
-    }
-
-    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
-        this.channelListener = channelListener;
-    }
-
-    public AsyncChannelListener getAsyncChannelListener() {
-        return channelListener;
-    }
-
-    public void dispose() {
-        if( running && channelListener!=null ) {
-            channelListener.onPacketError(new SocketException("Socket closed."));
-        }
-        selection.close();
-        super.dispose();
-    }
-
-    public void start() throws IOException {
-        if( running )
-            return;
-        running=true;
-        selection.setInterestOps(SelectionKey.OP_READ);
-    }
-
-    public void stop() throws IOException {
-        if( !running )
-            return;
-        running=false;
-        selection.setInterestOps(0);        
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.nio;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.packet.async.nio.NIOAsyncChannelSelectorManager.SelectorManagerListener;
+import org.apache.activeio.packet.async.nio.NIOAsyncChannelSelectorManager.SocketChannelAsyncChannelSelection;
+import org.apache.activeio.packet.sync.nio.NIOBaseChannel;
+
+/**
+ * @version $Revision$
+ */
+final public class NIOAsyncChannel extends NIOBaseChannel implements AsyncChannel {
+
+    private AsyncChannelListener channelListener;
+    private SocketChannelAsyncChannelSelection selection;
+    private ByteBuffer inputByteBuffer;
+    private boolean running;
+
+    public NIOAsyncChannel(SocketChannel socketChannel, boolean useDirect) throws IOException {
+        super(socketChannel, useDirect);
+
+        socketChannel.configureBlocking(false);                
+        selection = NIOAsyncChannelSelectorManager.register(socketChannel, new SelectorManagerListener(){
+            public void onSelect(SocketChannelAsyncChannelSelection selection) {
+                String origName = Thread.currentThread().getName();
+                if (selection.isReadable())
+                try {
+                    Thread.currentThread().setName(NIOAsyncChannel.this.toString());
+                    serviceRead();
+                 } catch ( Throwable e ) {
+                     System.err.println("ActiveIO unexpected error: ");
+                     e.printStackTrace(System.err);
+                 } finally {
+                     Thread.currentThread().setName(origName);
+                 }
+            }
+        });
+        
+    }
+    
+    private void serviceRead() {
+        try {
+            
+            while( true ) {
+            	
+	            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
+	                inputByteBuffer = allocateBuffer();
+	            }
+	
+	            int size = socketChannel.read(inputByteBuffer);
+	            if( size == -1 ) {
+	                this.channelListener.onPacket( EOSPacket.EOS_PACKET );
+	                selection.close();
+	                break;
+	            }
+
+	            if( size==0 ) {
+	                break;
+	            }
+	            
+	            // Per Mike Spile, some plaforms read 1 byte of data on the first read, and then
+	            // a but load of data on the second read.  Try to load the butload here
+	            if( size == 1 && inputByteBuffer.hasRemaining() ) {
+		            int size2 = socketChannel.read(inputByteBuffer);
+		            if( size2 > 0 )
+		            		size += size2;
+	            }
+	            
+	            ByteBuffer remaining = inputByteBuffer.slice();            
+	            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
+	            this.channelListener.onPacket( data );
+	                        
+	            // Keep the remaining buffer around to fill with data.
+	            inputByteBuffer = remaining;
+	            
+	            if( inputByteBuffer.hasRemaining() )
+	                break;
+            }
+            
+        } catch (IOException e) {
+            this.channelListener.onPacketError(e);
+        }
+    }
+    
+    synchronized public void write(Packet packet) throws IOException {
+        
+    	ByteBuffer data;
+        if( packet.getClass()==ByteBufferPacket.class ) {
+            data = ((ByteBufferPacket)packet).getByteBuffer();            
+        } else {
+        	ByteSequence sequence = packet.asByteSequence();
+        	data = ByteBuffer.wrap(sequence.getData(), sequence.getOffset(), sequence.getLength());
+        }
+
+        long delay=1;
+        while( data.hasRemaining() ) {
+	        
+            // Since the write is non-blocking, all the data may not have been written.
+            int r1 = data.remaining();        
+	        socketChannel.write( data );        
+	        int r2 = data.remaining();
+	        
+	        // We may need to do a little bit of sleeping to avoid a busy loop.
+            // Slow down if no data was written out.. 
+	        if( r2>0 && r1-r2==0 ) {
+	            try {
+                    // Use exponential rollback to increase sleep time.
+                    Thread.sleep(delay);
+                    delay *= 5;
+                    if( delay > 1000*1 ) {
+                        delay = 1000;
+                    }
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }
+	        } else {
+	            delay = 1;
+	        }
+        }
+    }
+
+    public void flush() throws IOException {
+    }
+
+    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+        this.channelListener = channelListener;
+    }
+
+    public AsyncChannelListener getAsyncChannelListener() {
+        return channelListener;
+    }
+
+    public void dispose() {
+        if( running && channelListener!=null ) {
+            channelListener.onPacketError(new SocketException("Socket closed."));
+        }
+        selection.close();
+        super.dispose();
+    }
+
+    public void start() throws IOException {
+        if( running )
+            return;
+        running=true;
+        selection.setInterestOps(SelectionKey.OP_READ);
+    }
+
+    public void stop() throws IOException {
+        if( !running )
+            return;
+        running=false;
+        selection.setInterestOps(0);        
+    }
  }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,131 +1,131 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.async.nio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activeio.adapter.SyncToAsyncChannelServer;
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelFactory;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
-import org.apache.activeio.util.URISupport;
-
-/**
- * A TcpAsyncChannelFactory creates {@see org.apache.activeio.net.TcpAsyncChannel}
- * and {@see org.apache.activeio.net.TcpAsyncChannelServer} objects.
- * 
- * @version $Revision$
- */
-public class NIOAsyncChannelFactory implements AsyncChannelFactory {
-    
-    protected static final int DEFAULT_BUFFER_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.net.nio.BufferSize", ""+(64*1024)));
-
-    protected static final int DEFAULT_BACKLOG = 500;
-    boolean useDirectBuffers = true;
-    private final boolean createWriteBufferedChannels;
-    private int backlog = DEFAULT_BACKLOG;
-    
-    public NIOAsyncChannelFactory() {
-        this(true);
-    }
-    
-    public NIOAsyncChannelFactory(boolean createWriteBufferedChannels) {
-        this.createWriteBufferedChannels = createWriteBufferedChannels;
-    }
-    
-    
-    /**
-     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
-     * 
-     * @see org.apache.activeio.AsyncChannelFactory#openAsyncChannel(java.net.URI)
-     */
-    public AsyncChannel openAsyncChannel(URI location) throws IOException {
-        SocketChannel channel = SocketChannel.open();
-        channel.connect(new InetSocketAddress(location.getHost(), location.getPort()));
-        return createAsyncChannel(channel);
-    }
-
-    /**
-     * @param channel
-     * @return
-     * @throws IOException
-     */
-    protected AsyncChannel createAsyncChannel(SocketChannel socketChannel) throws IOException {
-        AsyncChannel channel = new NIOAsyncChannel(socketChannel, useDirectBuffers);
-        if( createWriteBufferedChannels ) {
-            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(useDirectBuffers), false);
-        }
-        return channel;
-    }
-
-    /**
-     * Binds a server socket a the {@param location}'s port. 
-     * 
-     * @see org.apache.activeio.AsyncChannelFactory#bindAsyncChannel(java.net.URI)
-     */
-    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
-        
-        String host = bindURI.getHost();
-        InetSocketAddress address;
-        if( host == null || host.length() == 0 || host.equals("localhost") || host.equals("0.0.0.0") || InetAddress.getLocalHost().getHostName().equals(host) ) {            
-            address = new InetSocketAddress(bindURI.getPort());
-        } else {
-            address = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
-        }
-        
-        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.socket().bind(address,backlog);
-        
-        URI connectURI = bindURI;
-        try {
-//            connectURI = URISupport.changeHost(connectURI, InetAddress.getLocalHost().getHostName());
-            connectURI = URISupport.changePort(connectURI, serverSocketChannel.socket().getLocalPort());
-        } catch (URISyntaxException e) {
-            throw (IOException)new IOException("Could not build connect URI: "+e).initCause(e);
-        }
-        
-        // We won't use non blocking NIO for the server since you only need 1 thread for him anyways.
-        // Just resuing the SocketChannelSynchChannelServer.
-        return SyncToAsyncChannelServer.adapt( 
-                new NIOAsyncChannelServer(serverSocketChannel, bindURI, connectURI, createWriteBufferedChannels, useDirectBuffers));
-    }
-    
-    /**
-     * @return Returns the backlog.
-     */
-    public int getBacklog() {
-        return backlog;
-    }
-
-    /**
-     * @param backlog
-     *            The backlog to set.
-     */
-    public void setBacklog(int backlog) {
-        this.backlog = backlog;
-    }
-
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activeio.adapter.SyncToAsyncChannelServer;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.util.URISupport;
+
+/**
+ * A TcpAsyncChannelFactory creates {@see org.apache.activeio.net.TcpAsyncChannel}
+ * and {@see org.apache.activeio.net.TcpAsyncChannelServer} objects.
+ * 
+ * @version $Revision$
+ */
+public class NIOAsyncChannelFactory implements AsyncChannelFactory {
+    
+    protected static final int DEFAULT_BUFFER_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.net.nio.BufferSize", ""+(64*1024)));
+
+    protected static final int DEFAULT_BACKLOG = 500;
+    boolean useDirectBuffers = true;
+    private final boolean createWriteBufferedChannels;
+    private int backlog = DEFAULT_BACKLOG;
+    
+    public NIOAsyncChannelFactory() {
+        this(true);
+    }
+    
+    public NIOAsyncChannelFactory(boolean createWriteBufferedChannels) {
+        this.createWriteBufferedChannels = createWriteBufferedChannels;
+    }
+    
+    
+    /**
+     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
+     * 
+     * @see org.apache.activeio.AsyncChannelFactory#openAsyncChannel(java.net.URI)
+     */
+    public AsyncChannel openAsyncChannel(URI location) throws IOException {
+        SocketChannel channel = SocketChannel.open();
+        channel.connect(new InetSocketAddress(location.getHost(), location.getPort()));
+        return createAsyncChannel(channel);
+    }
+
+    /**
+     * @param channel
+     * @return
+     * @throws IOException
+     */
+    protected AsyncChannel createAsyncChannel(SocketChannel socketChannel) throws IOException {
+        AsyncChannel channel = new NIOAsyncChannel(socketChannel, useDirectBuffers);
+        if( createWriteBufferedChannels ) {
+            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(useDirectBuffers), false);
+        }
+        return channel;
+    }
+
+    /**
+     * Binds a server socket a the {@param location}'s port. 
+     * 
+     * @see org.apache.activeio.AsyncChannelFactory#bindAsyncChannel(java.net.URI)
+     */
+    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
+        
+        String host = bindURI.getHost();
+        InetSocketAddress address;
+        if( host == null || host.length() == 0 || host.equals("localhost") || host.equals("0.0.0.0") || InetAddress.getLocalHost().getHostName().equals(host) ) {            
+            address = new InetSocketAddress(bindURI.getPort());
+        } else {
+            address = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
+        }
+        
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.socket().bind(address,backlog);
+        
+        URI connectURI = bindURI;
+        try {
+//            connectURI = URISupport.changeHost(connectURI, InetAddress.getLocalHost().getHostName());
+            connectURI = URISupport.changePort(connectURI, serverSocketChannel.socket().getLocalPort());
+        } catch (URISyntaxException e) {
+            throw (IOException)new IOException("Could not build connect URI: "+e).initCause(e);
+        }
+        
+        // We won't use non blocking NIO for the server since you only need 1 thread for him anyways.
+        // Just resuing the SocketChannelSynchChannelServer.
+        return SyncToAsyncChannelServer.adapt( 
+                new NIOAsyncChannelServer(serverSocketChannel, bindURI, connectURI, createWriteBufferedChannels, useDirectBuffers));
+    }
+    
+    /**
+     * @return Returns the backlog.
+     */
+    public int getBacklog() {
+        return backlog;
+    }
+
+    /**
+     * @param backlog
+     *            The backlog to set.
+     */
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
+    }
+
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/async/nio/NIOAsyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message