activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [11/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ acti...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramContext.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramContext.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramContext.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramContext.java Tue Feb 21 15:12:56 2006
@@ -1,56 +1,56 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.datagram;
-
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-
-
-final public class DatagramContext {
-
-    public InetAddress address;
-    public Integer port;
-
-    public DatagramContext() {            
-    }
-    
-    public DatagramContext(DatagramPacket datagramPacket) {
-        this(datagramPacket.getAddress(), new Integer(datagramPacket.getPort()));
-    }
-    
-    public DatagramContext(InetAddress address, Integer port) {
-        this.address = address;
-        this.port = port;
-    }
-    
-    public InetAddress getAddress() {
-        return address;
-    }
-
-    public void setAddress(InetAddress address) {
-        this.address = address;
-    }
-
-    public Integer getPort() {
-        return port;
-    }
-
-    public void setPort(Integer port) {
-        this.port = port;
-    }
-    
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.datagram;
+
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+
+
+final public class DatagramContext {
+
+    public InetAddress address;
+    public Integer port;
+
+    public DatagramContext() {            
+    }
+    
+    public DatagramContext(DatagramPacket datagramPacket) {
+        this(datagramPacket.getAddress(), new Integer(datagramPacket.getPort()));
+    }
+    
+    public DatagramContext(InetAddress address, Integer port) {
+        this.address = address;
+        this.port = port;
+    }
+    
+    public InetAddress getAddress() {
+        return address;
+    }
+
+    public void setAddress(InetAddress address) {
+        this.address = address;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void setPort(Integer port) {
+        this.port = port;
+    }
+    
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,165 +1,165 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio.packet.sync.datagram;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.packet.FilterPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-/**
- * A {@see org.apache.activeio.SynchChannel}implementation that uses
- * TCP to talk to the network.
- * 
- * @version $Revision$
- */
-public class DatagramSocketSyncChannel implements SyncChannel {
-
-    private final class UDPFilterPacket extends FilterPacket {
-        private final DatagramPacket packet;
-
-        private UDPFilterPacket(Packet next, DatagramPacket packet) {
-            super(next);
-            this.packet = packet;
-        }
-
-        public Object getAdapter(Class target) {
-            if( target == DatagramContext.class ) {
-                return new DatagramContext(packet);
-            }
-            return super.getAdapter(target);
-        }
-
-        public Packet filter(Packet packet) {
-            return new UDPFilterPacket(packet, this.packet);
-        }
-    }
-
-    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
-    private final DatagramSocket socket;
-
-    private boolean disposed;
-
-    private int curentSoTimeout;
-
-    /**
-     * Construct basic helpers
-     * 
-     * @param wireFormat
-     * @throws IOException
-     */
-    protected DatagramSocketSyncChannel(DatagramSocket socket) throws IOException {
-        this.socket = socket;
-        socket.setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
-        socket.setSendBufferSize(DEFAULT_BUFFER_SIZE);
-    }
-
-    protected DatagramSocket getSocket() {
-        return socket;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.sync.SyncChannel#read(long)
-     */
-    public org.apache.activeio.packet.Packet read(long timeout) throws IOException {
-        try {
-
-            if (timeout == SyncChannelServer.WAIT_FOREVER_TIMEOUT)
-                setSoTimeout(0);
-            else if (timeout == SyncChannelServer.NO_WAIT_TIMEOUT)
-                setSoTimeout(1);
-            else
-                setSoTimeout((int) timeout);
-
-            // FYI: message data is truncated if biger than this buffer.
-            final byte data[] = new byte[DEFAULT_BUFFER_SIZE];
-            final DatagramPacket packet = new DatagramPacket(data, data.length);
-            socket.receive(packet);
-            
-            // A FilterPacket is used to provide the UdpDatagramContext via narrow.
-            return new UDPFilterPacket(new ByteArrayPacket(data, 0, packet.getLength()), packet);
-            
-        } catch (SocketTimeoutException e) {
-            return null;
-        }
-    }
-
-    private void setSoTimeout(int i) throws SocketException {
-        if (curentSoTimeout != i) {
-            socket.setSoTimeout(i);
-            curentSoTimeout = i;
-        }
-    }
-
-    /**
-     * @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
-     */
-    public void write(org.apache.activeio.packet.Packet packet) throws IOException {
-        ByteSequence sequence = packet.asByteSequence();
-
-        DatagramContext context = (DatagramContext) packet.getAdapter(DatagramContext.class);
-        if( context!=null ) {
-            socket.send(new DatagramPacket(sequence.getData(),sequence.getOffset(), sequence.getLength(), context.address, context.port.intValue()));
-        } else {
-            socket.send(new DatagramPacket(sequence.getData(),sequence.getOffset(), sequence.getLength()));
-        }
-    }
-
-    /**
-     * @see org.apache.activeio.Channel#flush()
-     */
-    public void flush() throws IOException {
-    }
-
-    /**
-     * @see org.apache.activeio.Disposable#dispose()
-     */
-    public void dispose() {
-        if (disposed)
-            return;
-        socket.close();
-        disposed = true;
-    }
-
-    public void start() throws IOException {
-    }
-
-    public void stop() throws IOException {
-    }
-
-    
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-
-    public String toString() {
-        return "Datagram Connection: "+socket.getLocalSocketAddress()+" -> "+socket.getRemoteSocketAddress();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio.packet.sync.datagram;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.packet.FilterPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+/**
+ * A {@see org.apache.activeio.SynchChannel}implementation that uses
+ * TCP to talk to the network.
+ * 
+ * @version $Revision$
+ */
+public class DatagramSocketSyncChannel implements SyncChannel {
+
+    private final class UDPFilterPacket extends FilterPacket {
+        private final DatagramPacket packet;
+
+        private UDPFilterPacket(Packet next, DatagramPacket packet) {
+            super(next);
+            this.packet = packet;
+        }
+
+        public Object getAdapter(Class target) {
+            if( target == DatagramContext.class ) {
+                return new DatagramContext(packet);
+            }
+            return super.getAdapter(target);
+        }
+
+        public Packet filter(Packet packet) {
+            return new UDPFilterPacket(packet, this.packet);
+        }
+    }
+
+    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+    private final DatagramSocket socket;
+
+    private boolean disposed;
+
+    private int curentSoTimeout;
+
+    /**
+     * Construct basic helpers
+     * 
+     * @param wireFormat
+     * @throws IOException
+     */
+    protected DatagramSocketSyncChannel(DatagramSocket socket) throws IOException {
+        this.socket = socket;
+        socket.setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
+        socket.setSendBufferSize(DEFAULT_BUFFER_SIZE);
+    }
+
+    protected DatagramSocket getSocket() {
+        return socket;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.sync.SyncChannel#read(long)
+     */
+    public org.apache.activeio.packet.Packet read(long timeout) throws IOException {
+        try {
+
+            if (timeout == SyncChannelServer.WAIT_FOREVER_TIMEOUT)
+                setSoTimeout(0);
+            else if (timeout == SyncChannelServer.NO_WAIT_TIMEOUT)
+                setSoTimeout(1);
+            else
+                setSoTimeout((int) timeout);
+
+            // FYI: message data is truncated if biger than this buffer.
+            final byte data[] = new byte[DEFAULT_BUFFER_SIZE];
+            final DatagramPacket packet = new DatagramPacket(data, data.length);
+            socket.receive(packet);
+            
+            // A FilterPacket is used to provide the UdpDatagramContext via narrow.
+            return new UDPFilterPacket(new ByteArrayPacket(data, 0, packet.getLength()), packet);
+            
+        } catch (SocketTimeoutException e) {
+            return null;
+        }
+    }
+
+    private void setSoTimeout(int i) throws SocketException {
+        if (curentSoTimeout != i) {
+            socket.setSoTimeout(i);
+            curentSoTimeout = i;
+        }
+    }
+
+    /**
+     * @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
+     */
+    public void write(org.apache.activeio.packet.Packet packet) throws IOException {
+        ByteSequence sequence = packet.asByteSequence();
+
+        DatagramContext context = (DatagramContext) packet.getAdapter(DatagramContext.class);
+        if( context!=null ) {
+            socket.send(new DatagramPacket(sequence.getData(),sequence.getOffset(), sequence.getLength(), context.address, context.port.intValue()));
+        } else {
+            socket.send(new DatagramPacket(sequence.getData(),sequence.getOffset(), sequence.getLength()));
+        }
+    }
+
+    /**
+     * @see org.apache.activeio.Channel#flush()
+     */
+    public void flush() throws IOException {
+    }
+
+    /**
+     * @see org.apache.activeio.Disposable#dispose()
+     */
+    public void dispose() {
+        if (disposed)
+            return;
+        socket.close();
+        disposed = true;
+    }
+
+    public void start() throws IOException {
+    }
+
+    public void stop() throws IOException {
+    }
+
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+
+    public String toString() {
+        return "Datagram Connection: "+socket.getLocalSocketAddress()+" -> "+socket.getRemoteSocketAddress();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,84 +1,84 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.datagram;
-
-import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.URI;
-
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelFactory;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-/**
- * A TcpSynchChannelFactory creates {@see org.apache.activeio.net.TcpSynchChannel}
- * and {@see org.apache.activeio.net.TcpSynchChannelServer} objects.
- * 
- * @version $Revision$
- */
-public class DatagramSocketSyncChannelFactory implements SyncChannelFactory {
-
-    /**
-     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
-     * 
-     * @see org.apache.activeio.SynchChannelFactory#openSyncChannel(java.net.URI)
-     */
-    public SyncChannel openSyncChannel(URI location) throws IOException {
-        DatagramSocket socket=null;
-        socket = new DatagramSocket();
-        if( location != null ) {
-            InetAddress address = InetAddress.getByName(location.getHost());
-            socket.connect(address, location.getPort());
-        }
-        return createSyncChannel(socket);
-    }
-
-    /**
-     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
-     * 
-     */
-    public SyncChannel openSyncChannel(URI location, URI localLocation) throws IOException {
-        DatagramSocket socket=null;
-        InetAddress address = InetAddress.getByName(localLocation.getHost());
-        socket = new DatagramSocket(localLocation.getPort(), address);
-
-        if( location != null ) {
-            address = InetAddress.getByName(location.getHost());
-            socket.connect(address, location.getPort());
-        }
-        return createSyncChannel(socket);
-    }
-
-    /**
-     * @param socket
-     * @return
-     * @throws IOException
-     */
-    protected SyncChannel createSyncChannel(DatagramSocket socket) throws IOException {
-        return new DatagramSocketSyncChannel(socket);
-    }
-
-    /**
-     * @throws IOException allways thrown.
-     * @see org.apache.activeio.SynchChannelFactory#bindSynchChannel(java.net.URI)
-     */
-    public SyncChannelServer bindSyncChannel(URI location) throws IOException {
-        throw new IOException("A SynchChannelServer is not available for this channel.");
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.datagram;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.URI;
+
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelFactory;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+/**
+ * A TcpSynchChannelFactory creates {@see org.apache.activeio.net.TcpSynchChannel}
+ * and {@see org.apache.activeio.net.TcpSynchChannelServer} objects.
+ * 
+ * @version $Revision$
+ */
+public class DatagramSocketSyncChannelFactory implements SyncChannelFactory {
+
+    /**
+     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
+     * 
+     * @see org.apache.activeio.SynchChannelFactory#openSyncChannel(java.net.URI)
+     */
+    public SyncChannel openSyncChannel(URI location) throws IOException {
+        DatagramSocket socket=null;
+        socket = new DatagramSocket();
+        if( location != null ) {
+            InetAddress address = InetAddress.getByName(location.getHost());
+            socket.connect(address, location.getPort());
+        }
+        return createSyncChannel(socket);
+    }
+
+    /**
+     * Uses the {@param location}'s host and port to create a tcp connection to a remote host.
+     * 
+     */
+    public SyncChannel openSyncChannel(URI location, URI localLocation) throws IOException {
+        DatagramSocket socket=null;
+        InetAddress address = InetAddress.getByName(localLocation.getHost());
+        socket = new DatagramSocket(localLocation.getPort(), address);
+
+        if( location != null ) {
+            address = InetAddress.getByName(location.getHost());
+            socket.connect(address, location.getPort());
+        }
+        return createSyncChannel(socket);
+    }
+
+    /**
+     * @param socket
+     * @return
+     * @throws IOException
+     */
+    protected SyncChannel createSyncChannel(DatagramSocket socket) throws IOException {
+        return new DatagramSocketSyncChannel(socket);
+    }
+
+    /**
+     * @throws IOException allways thrown.
+     * @see org.apache.activeio.SynchChannelFactory#bindSynchChannel(java.net.URI)
+     */
+    public SyncChannelServer bindSyncChannel(URI location) throws IOException {
+        throw new IOException("A SynchChannelServer is not available for this channel.");
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/datagram/DatagramSocketSyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PacketAggregatingSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PacketAggregatingSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PacketAggregatingSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PacketAggregatingSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,94 +1,94 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.filter;
-
-import java.io.IOException;
-import java.util.LinkedList;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.FilterSyncChannel;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.util.PacketAggregator;
-
-/**
- * This PacketAggregatingSynchChannel can be used when the client is sending a
- * 'record' style packet down the channel stack and needs receiving end to
- * receive the same 'record' packets.
- * 
- * This is very usefull since in general, a channel does not garantee that a
- * Packet that is sent down will not be fragmented or combined with other Packet
- * objects.
- * 
- * This {@see org.apache.activeio.SynchChannel} adds a 4 byte header
- * to each packet that is sent down.
- * 
- * @version $Revision$
- */
-final public class PacketAggregatingSyncChannel extends FilterSyncChannel {
-
-    private final LinkedList assembledPackets = new LinkedList();    
-    private final PacketAggregator aggregator = new PacketAggregator() {
-        protected void packetAssembled(Packet packet) {
-            assembledPackets.addLast(packet);
-        }
-    };
-    
-    /**
-     * @param next
-     */
-    public PacketAggregatingSyncChannel(SyncChannel next) {
-        super(next);
-    }
-    
-    public Packet read(long timeout) throws IOException {
-        long start = System.currentTimeMillis();
-        if( assembledPackets.isEmpty() ) {
-            while( true ) {
-                
-	            Packet packet = getNext().read(timeout);
-	            if( packet==null ) {
-                    return null;
-	            }
-	            
-	            aggregator.addRawPacket(packet);
-	            
-	            // Should we try to get more packets?
-	            if( assembledPackets.isEmpty() ) {
-	                if( timeout == WAIT_FOREVER_TIMEOUT )
-	                    continue;
-	                
-	                timeout = Math.max(0, timeout-(System.currentTimeMillis()-start));
-	                if( timeout != 0 )
-	                    continue;
-	                
-	                return null;
-	            } else {
-	                return (Packet) assembledPackets.removeFirst();
-	            }
-            }
-            
-        } else {
-            return (Packet) assembledPackets.removeFirst();
-        }
-        
-    }
-    
-    public void write(Packet packet) throws IOException {
-        getNext().write(aggregator.getHeader(packet));
-        getNext().write(packet);
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.filter;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.FilterSyncChannel;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.util.PacketAggregator;
+
+/**
+ * This PacketAggregatingSynchChannel can be used when the client is sending a
+ * 'record' style packet down the channel stack and needs receiving end to
+ * receive the same 'record' packets.
+ * 
+ * This is very usefull since in general, a channel does not garantee that a
+ * Packet that is sent down will not be fragmented or combined with other Packet
+ * objects.
+ * 
+ * This {@see org.apache.activeio.SynchChannel} adds a 4 byte header
+ * to each packet that is sent down.
+ * 
+ * @version $Revision$
+ */
+final public class PacketAggregatingSyncChannel extends FilterSyncChannel {
+
+    private final LinkedList assembledPackets = new LinkedList();    
+    private final PacketAggregator aggregator = new PacketAggregator() {
+        protected void packetAssembled(Packet packet) {
+            assembledPackets.addLast(packet);
+        }
+    };
+    
+    /**
+     * @param next
+     */
+    public PacketAggregatingSyncChannel(SyncChannel next) {
+        super(next);
+    }
+    
+    public Packet read(long timeout) throws IOException {
+        long start = System.currentTimeMillis();
+        if( assembledPackets.isEmpty() ) {
+            while( true ) {
+                
+	            Packet packet = getNext().read(timeout);
+	            if( packet==null ) {
+                    return null;
+	            }
+	            
+	            aggregator.addRawPacket(packet);
+	            
+	            // Should we try to get more packets?
+	            if( assembledPackets.isEmpty() ) {
+	                if( timeout == WAIT_FOREVER_TIMEOUT )
+	                    continue;
+	                
+	                timeout = Math.max(0, timeout-(System.currentTimeMillis()-start));
+	                if( timeout != 0 )
+	                    continue;
+	                
+	                return null;
+	            } else {
+	                return (Packet) assembledPackets.removeFirst();
+	            }
+            }
+            
+        } else {
+            return (Packet) assembledPackets.removeFirst();
+        }
+        
+    }
+    
+    public void write(Packet packet) throws IOException {
+        getNext().write(aggregator.getHeader(packet));
+        getNext().write(packet);
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PacketAggregatingSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PushbackSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PushbackSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PushbackSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PushbackSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,54 +1,54 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.filter;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.FilterSyncChannel;
-import org.apache.activeio.packet.sync.SyncChannel;
-
-/**
- *
- */
-public class PushbackSyncChannel extends FilterSyncChannel {
-
-    private Packet putback;
-
-    public PushbackSyncChannel(SyncChannel next) {
-        this(next, null);
-    }
-    
-    public PushbackSyncChannel(SyncChannel next, Packet putback) {
-        super(next);
-        this.putback=putback;
-    }
-    
-    public void putback(Packet packet) {
-        this.putback = packet;
-    }
-    
-    public Packet read(long timeout) throws IOException {
-        if(putback!=null ) {
-            Packet p = putback;
-            putback=null;
-            return p;
-        }
-        return super.read(timeout);
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.filter;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.FilterSyncChannel;
+import org.apache.activeio.packet.sync.SyncChannel;
+
+/**
+ *
+ */
+public class PushbackSyncChannel extends FilterSyncChannel {
+
+    private Packet putback;
+
+    public PushbackSyncChannel(SyncChannel next) {
+        this(next, null);
+    }
+    
+    public PushbackSyncChannel(SyncChannel next, Packet putback) {
+        super(next);
+        this.putback=putback;
+    }
+    
+    public void putback(Packet packet) {
+        this.putback = packet;
+    }
+    
+    public Packet read(long timeout) throws IOException {
+        if(putback!=null ) {
+            Packet p = putback;
+            putback=null;
+            return p;
+        }
+        return super.read(timeout);
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/PushbackSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/SynchornizedSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/SynchornizedSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/SynchornizedSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/SynchornizedSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,118 +1,118 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.filter;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.FilterSyncChannel;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
-import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Used to synchronize concurrent access to a SynchChannel.  
- * 
- * Uses two different {@see edu.emory.mathcs.backport.java.util.concurrent.Sync} objects
- * for write and read operations.  All other operations such as {@see #stop(long)}
- * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
- * object instance.
- * 
- */
-public class SynchornizedSyncChannel extends FilterSyncChannel {
-
-    private final Lock readLock;
-    private final Lock writeLock;
-
-    public SynchornizedSyncChannel(SyncChannel next) {
-        this(next, new ReentrantLock(), new ReentrantLock());
-    }
-    
-    public SynchornizedSyncChannel(SyncChannel next, Lock readLock, Lock writeLock) {
-        super(next);
-        this.readLock = readLock;
-        this.writeLock = writeLock;
-    }
-    
-    public Packet read(long timeout) throws IOException {
-        try {            
-            
-            if( timeout==SyncChannelServer.WAIT_FOREVER_TIMEOUT ) {
-                readLock.lock();
-            } else {
-                long start = System.currentTimeMillis();
-                if( !readLock.tryLock(0, TimeUnit.MILLISECONDS) ) {
-                    return null;
-                }
-                // Adjust the resulting timeout down to account for time taken to 
-                // get the readLock.
-                timeout = Math.max(0, timeout-(System.currentTimeMillis()-start));
-            }
-            
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException(e.getMessage());            
-        }
-        
-        try {
-            return getNext().read(timeout);            
-        } finally {
-            readLock.unlock();
-        }
-    }
-    
-    public void write(Packet packet) throws IOException {
-        writeLock.lock();
-        try {
-            getNext().write(packet);            
-        } finally {
-            writeLock.unlock();
-        }
-    }
-    
-    public void flush() throws IOException {
-        writeLock.lock();
-        try {
-            getNext().flush();            
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    synchronized public Object getAdapter(Class target) {
-        return super.getAdapter(target);
-    }
-
-    synchronized public void start() throws IOException {
-        super.start();
-    }
-
-    synchronized public void stop() throws IOException {
-        super.stop();
-    }
-    
-    public Lock getReadLock() {
-        return readLock;
-    }
-    
-    public Lock getWriteLock() {
-        return writeLock;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.filter;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.FilterSyncChannel;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Used to synchronize concurrent access to a SynchChannel.  
+ * 
+ * Uses two different {@see edu.emory.mathcs.backport.java.util.concurrent.Sync} objects
+ * for write and read operations.  All other operations such as {@see #stop(long)}
+ * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
+ * object instance.
+ * 
+ */
+public class SynchornizedSyncChannel extends FilterSyncChannel {
+
+    private final Lock readLock;
+    private final Lock writeLock;
+
+    public SynchornizedSyncChannel(SyncChannel next) {
+        this(next, new ReentrantLock(), new ReentrantLock());
+    }
+    
+    public SynchornizedSyncChannel(SyncChannel next, Lock readLock, Lock writeLock) {
+        super(next);
+        this.readLock = readLock;
+        this.writeLock = writeLock;
+    }
+    
+    public Packet read(long timeout) throws IOException {
+        try {            
+            
+            if( timeout==SyncChannelServer.WAIT_FOREVER_TIMEOUT ) {
+                readLock.lock();
+            } else {
+                long start = System.currentTimeMillis();
+                if( !readLock.tryLock(0, TimeUnit.MILLISECONDS) ) {
+                    return null;
+                }
+                // Adjust the resulting timeout down to account for time taken to 
+                // get the readLock.
+                timeout = Math.max(0, timeout-(System.currentTimeMillis()-start));
+            }
+            
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.getMessage());            
+        }
+        
+        try {
+            return getNext().read(timeout);            
+        } finally {
+            readLock.unlock();
+        }
+    }
+    
+    public void write(Packet packet) throws IOException {
+        writeLock.lock();
+        try {
+            getNext().write(packet);            
+        } finally {
+            writeLock.unlock();
+        }
+    }
+    
+    public void flush() throws IOException {
+        writeLock.lock();
+        try {
+            getNext().flush();            
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    synchronized public Object getAdapter(Class target) {
+        return super.getAdapter(target);
+    }
+
+    synchronized public void start() throws IOException {
+        super.start();
+    }
+
+    synchronized public void stop() throws IOException {
+        super.stop();
+    }
+    
+    public Lock getReadLock() {
+        return readLock;
+    }
+    
+    public Lock getWriteLock() {
+        return writeLock;
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/SynchornizedSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/WriteBufferedSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/WriteBufferedSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/WriteBufferedSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/WriteBufferedSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,69 +1,69 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.filter;
-
-import java.io.IOException;
-
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.FilterSyncChannel;
-import org.apache.activeio.packet.sync.SyncChannel;
-
-/**
- */
-public class WriteBufferedSyncChannel extends FilterSyncChannel {
-
-    private static final int DEFAULT_BUFFER_SIZE = 1024*64;
-    private final Packet buffer;
-    private final boolean enableDirectWrites;
-    
-    public WriteBufferedSyncChannel(SyncChannel channel) {
-        this(channel, new ByteArrayPacket(new byte[DEFAULT_BUFFER_SIZE]));
-    }
-    
-    public WriteBufferedSyncChannel(SyncChannel channel, Packet buffer) {
-        this(channel, buffer, true);
-    }
-
-    public WriteBufferedSyncChannel(SyncChannel channel, Packet buffer, boolean enableDirectWrites) {
-        super(channel);
-        this.buffer = buffer;
-        this.enableDirectWrites = enableDirectWrites;
-    }
-
-    public void write(Packet packet) throws IOException {
-        
-        while( packet.hasRemaining() ) {
-	        packet.read(buffer);
-	        if( !buffer.hasRemaining() ) {
-	            flush();
-	            
-	            // Should we just direct write the rest?
-	            if( enableDirectWrites && packet.remaining() > buffer.capacity()) {
-	                getNext().write(packet);
-	                return;
-	            }
-	        }
-        }        
-    }
-    
-    public void flush() throws IOException {
-        buffer.flip();
-        getNext().write(buffer);
-        buffer.clear();
-    }    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.filter;
+
+import java.io.IOException;
+
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.FilterSyncChannel;
+import org.apache.activeio.packet.sync.SyncChannel;
+
+/**
+ */
+public class WriteBufferedSyncChannel extends FilterSyncChannel {
+
+    private static final int DEFAULT_BUFFER_SIZE = 1024*64;
+    private final Packet buffer;
+    private final boolean enableDirectWrites;
+    
+    public WriteBufferedSyncChannel(SyncChannel channel) {
+        this(channel, new ByteArrayPacket(new byte[DEFAULT_BUFFER_SIZE]));
+    }
+    
+    public WriteBufferedSyncChannel(SyncChannel channel, Packet buffer) {
+        this(channel, buffer, true);
+    }
+
+    public WriteBufferedSyncChannel(SyncChannel channel, Packet buffer, boolean enableDirectWrites) {
+        super(channel);
+        this.buffer = buffer;
+        this.enableDirectWrites = enableDirectWrites;
+    }
+
+    public void write(Packet packet) throws IOException {
+        
+        while( packet.hasRemaining() ) {
+	        packet.read(buffer);
+	        if( !buffer.hasRemaining() ) {
+	            flush();
+	            
+	            // Should we just direct write the rest?
+	            if( enableDirectWrites && packet.remaining() > buffer.capacity()) {
+	                getNext().write(packet);
+	                return;
+	            }
+	        }
+        }        
+    }
+    
+    public void flush() throws IOException {
+        buffer.flip();
+        getNext().write(buffer);
+        buffer.clear();
+    }    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/filter/WriteBufferedSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,50 +1,50 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.multicast;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-
-import org.apache.activeio.packet.sync.datagram.DatagramSocketSyncChannel;
-
-
-/**
- * @version $Revision$
- */
-final public class MulticastSocketSyncChannel extends DatagramSocketSyncChannel {
-
-    private final InetAddress groupAddress;
-
-
-    protected MulticastSocketSyncChannel(MulticastSocket socket, InetAddress groupAddress) throws IOException {
-        super(socket);
-        this.groupAddress = groupAddress;
-    }
-
-    public void start() throws IOException {
-        ((MulticastSocket) getSocket()).joinGroup(groupAddress);
-    }
-
-    public void stop() throws IOException {
-        ((MulticastSocket) getSocket()).leaveGroup(groupAddress);
-    }
-
-    public String toString() {
-        return "MulticastSocket Connection: " + getSocket().getLocalSocketAddress() + " -> " + getSocket().getRemoteSocketAddress();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.multicast;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+
+import org.apache.activeio.packet.sync.datagram.DatagramSocketSyncChannel;
+
+
+/**
+ * @version $Revision$
+ */
+final public class MulticastSocketSyncChannel extends DatagramSocketSyncChannel {
+
+    private final InetAddress groupAddress;
+
+
+    protected MulticastSocketSyncChannel(MulticastSocket socket, InetAddress groupAddress) throws IOException {
+        super(socket);
+        this.groupAddress = groupAddress;
+    }
+
+    public void start() throws IOException {
+        ((MulticastSocket) getSocket()).joinGroup(groupAddress);
+    }
+
+    public void stop() throws IOException {
+        ((MulticastSocket) getSocket()).leaveGroup(groupAddress);
+    }
+
+    public String toString() {
+        return "MulticastSocket Connection: " + getSocket().getLocalSocketAddress() + " -> " + getSocket().getRemoteSocketAddress();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannelFactory.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannelFactory.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannelFactory.java Tue Feb 21 15:12:56 2006
@@ -1,59 +1,59 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet.sync.multicast;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.net.URI;
-
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-
-/**
- * @version $Revision: $ $Date: $
- */
-public class MulticastSocketSyncChannelFactory {
-
-    public SyncChannel openSyncChannel(URI groupURI) throws IOException {
-        if (groupURI == null) throw new IllegalArgumentException("group URI cannot be null");
-
-        MulticastSocket socket = new MulticastSocket(groupURI.getPort());
-
-        return createSyncChannel(socket, InetAddress.getByName(groupURI.getHost()));
-    }
-
-    public SyncChannel openSyncChannel(URI groupURI, URI localLocation) throws IOException {
-        if (groupURI == null) throw new IllegalArgumentException("group URI cannot be null");
-
-        MulticastSocket socket = new MulticastSocket(groupURI.getPort());
-        if (localLocation != null) {
-            socket.setInterface(InetAddress.getByName(localLocation.getHost()));
-        }
-
-        return createSyncChannel(socket, InetAddress.getByName(groupURI.getHost()));
-    }
-
-    protected SyncChannel createSyncChannel(MulticastSocket socket, InetAddress groupAddress) throws IOException {
-        return new MulticastSocketSyncChannel(socket, groupAddress);
-    }
-
-    public SyncChannelServer bindSyncChannel(URI location) throws IOException {
-        throw new IOException("A SyncChannelServer is not available for this channel.");
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.sync.multicast;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.net.URI;
+
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+
+/**
+ * @version $Revision: $ $Date: $
+ */
+public class MulticastSocketSyncChannelFactory {
+
+    public SyncChannel openSyncChannel(URI groupURI) throws IOException {
+        if (groupURI == null) throw new IllegalArgumentException("group URI cannot be null");
+
+        MulticastSocket socket = new MulticastSocket(groupURI.getPort());
+
+        return createSyncChannel(socket, InetAddress.getByName(groupURI.getHost()));
+    }
+
+    public SyncChannel openSyncChannel(URI groupURI, URI localLocation) throws IOException {
+        if (groupURI == null) throw new IllegalArgumentException("group URI cannot be null");
+
+        MulticastSocket socket = new MulticastSocket(groupURI.getPort());
+        if (localLocation != null) {
+            socket.setInterface(InetAddress.getByName(localLocation.getHost()));
+        }
+
+        return createSyncChannel(socket, InetAddress.getByName(groupURI.getHost()));
+    }
+
+    protected SyncChannel createSyncChannel(MulticastSocket socket, InetAddress groupAddress) throws IOException {
+        return new MulticastSocketSyncChannel(socket, groupAddress);
+    }
+
+    public SyncChannelServer bindSyncChannel(URI location) throws IOException {
+        throw new IOException("A SyncChannelServer is not available for this channel.");
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/multicast/MulticastSocketSyncChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOBaseChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOBaseChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOBaseChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOBaseChannel.java Tue Feb 21 15:12:56 2006
@@ -1,181 +1,181 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio.packet.sync.nio;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.stream.sync.socket.SocketMetadata;
-
-/**
- * Base class for the Async and Sync implementations of NIO channels.
- * 
- * @version $Revision$
- */
-public class NIOBaseChannel implements SocketMetadata {
-
-	protected final SocketChannel socketChannel;
-    protected final Socket socket;
-	private final boolean useDirect;
-    private int curentSoTimeout;
-	private boolean disposed;
-    private final String name;
-
-    protected NIOBaseChannel(SocketChannel socketChannel, boolean useDirect) throws IOException {
-        this.socketChannel = socketChannel;
-		this.useDirect = useDirect;
-		this.socket = this.socketChannel.socket();
-
-        if( useDirect ) {
-            socket.setSendBufferSize(ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE);
-            socket.setReceiveBufferSize(ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE);
-        } else {
-            socket.setSendBufferSize(ByteBufferPacket.DEFAULT_BUFFER_SIZE);
-            socket.setReceiveBufferSize(ByteBufferPacket.DEFAULT_BUFFER_SIZE);
-        }		
-
-        this.name = "NIO Socket Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress();
-    }
-    
-    protected ByteBuffer allocateBuffer() {
-        if( useDirect ) {
-            return ByteBuffer.allocateDirect(ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE);
-        } else {
-            return ByteBuffer.allocate(ByteBufferPacket.DEFAULT_BUFFER_SIZE);
-        }
-    }
-
-    public void setSoTimeout(int i) throws SocketException {
-        if( curentSoTimeout != i ) {
-            socket.setSoTimeout(i);
-            curentSoTimeout = i;
-        }
-    }
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-
-    public String toString() {
-        return name;
-    }
-
-	public void dispose() {
-        if (disposed)
-            return;
-
-        try {
-            socketChannel.close();
-        } catch (IOException ignore) {
-        }
-        disposed = true;
-	}
-
-    /**
-     * @see org.apache.activeio.Channel#flush()
-     */
-    public void flush() throws IOException {
-    }
-    
-    public InetAddress getInetAddress() {
-        return socket.getInetAddress();
-    }
-    public boolean getKeepAlive() throws SocketException {
-        return socket.getKeepAlive();
-    }
-    public InetAddress getLocalAddress() {
-        return socket.getLocalAddress();
-    }
-    public int getLocalPort() {
-        return socket.getLocalPort();
-    }
-    public SocketAddress getLocalSocketAddress() {
-        return socket.getLocalSocketAddress();
-    }
-    public boolean getOOBInline() throws SocketException {
-        return socket.getOOBInline();
-    }
-    public int getPort() {
-        return socket.getPort();
-    }
-    public int getReceiveBufferSize() throws SocketException {
-        return socket.getReceiveBufferSize();
-    }
-    public SocketAddress getRemoteSocketAddress() {
-        return socket.getRemoteSocketAddress();
-    }
-    public boolean getReuseAddress() throws SocketException {
-        return socket.getReuseAddress();
-    }
-    public int getSendBufferSize() throws SocketException {
-        return socket.getSendBufferSize();
-    }
-    public int getSoLinger() throws SocketException {
-        return socket.getSoLinger();
-    }
-    public int getSoTimeout() throws SocketException {
-        return socket.getSoTimeout();
-    }
-    public boolean getTcpNoDelay() throws SocketException {
-        return socket.getTcpNoDelay();
-    }
-    public int getTrafficClass() throws SocketException {
-        return socket.getTrafficClass();
-    }
-    public boolean isBound() {
-        return socket.isBound();
-    }
-    public boolean isClosed() {
-        return socket.isClosed();
-    }
-    public boolean isConnected() {
-        return socket.isConnected();
-    }
-    public void setKeepAlive(boolean on) throws SocketException {
-        socket.setKeepAlive(on);
-    }
-    public void setOOBInline(boolean on) throws SocketException {
-        socket.setOOBInline(on);
-    }
-    public void setReceiveBufferSize(int size) throws SocketException {
-        socket.setReceiveBufferSize(size);
-    }
-    public void setReuseAddress(boolean on) throws SocketException {
-        socket.setReuseAddress(on);
-    }
-    public void setSendBufferSize(int size) throws SocketException {
-        socket.setSendBufferSize(size);
-    }
-    public void setSoLinger(boolean on, int linger) throws SocketException {
-        socket.setSoLinger(on, linger);
-    }
-    public void setTcpNoDelay(boolean on) throws SocketException {
-        socket.setTcpNoDelay(on);
-    }
-    public void setTrafficClass(int tc) throws SocketException {
-        socket.setTrafficClass(tc);
-    }    
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio.packet.sync.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.stream.sync.socket.SocketMetadata;
+
+/**
+ * Base class for the Async and Sync implementations of NIO channels.
+ * 
+ * @version $Revision$
+ */
+public class NIOBaseChannel implements SocketMetadata {
+
+	protected final SocketChannel socketChannel;
+    protected final Socket socket;
+	private final boolean useDirect;
+    private int curentSoTimeout;
+	private boolean disposed;
+    private final String name;
+
+    protected NIOBaseChannel(SocketChannel socketChannel, boolean useDirect) throws IOException {
+        this.socketChannel = socketChannel;
+		this.useDirect = useDirect;
+		this.socket = this.socketChannel.socket();
+
+        if( useDirect ) {
+            socket.setSendBufferSize(ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE);
+            socket.setReceiveBufferSize(ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE);
+        } else {
+            socket.setSendBufferSize(ByteBufferPacket.DEFAULT_BUFFER_SIZE);
+            socket.setReceiveBufferSize(ByteBufferPacket.DEFAULT_BUFFER_SIZE);
+        }		
+
+        this.name = "NIO Socket Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress();
+    }
+    
+    protected ByteBuffer allocateBuffer() {
+        if( useDirect ) {
+            return ByteBuffer.allocateDirect(ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE);
+        } else {
+            return ByteBuffer.allocate(ByteBufferPacket.DEFAULT_BUFFER_SIZE);
+        }
+    }
+
+    public void setSoTimeout(int i) throws SocketException {
+        if( curentSoTimeout != i ) {
+            socket.setSoTimeout(i);
+            curentSoTimeout = i;
+        }
+    }
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+
+    public String toString() {
+        return name;
+    }
+
+	public void dispose() {
+        if (disposed)
+            return;
+
+        try {
+            socketChannel.close();
+        } catch (IOException ignore) {
+        }
+        disposed = true;
+	}
+
+    /**
+     * @see org.apache.activeio.Channel#flush()
+     */
+    public void flush() throws IOException {
+    }
+    
+    public InetAddress getInetAddress() {
+        return socket.getInetAddress();
+    }
+    public boolean getKeepAlive() throws SocketException {
+        return socket.getKeepAlive();
+    }
+    public InetAddress getLocalAddress() {
+        return socket.getLocalAddress();
+    }
+    public int getLocalPort() {
+        return socket.getLocalPort();
+    }
+    public SocketAddress getLocalSocketAddress() {
+        return socket.getLocalSocketAddress();
+    }
+    public boolean getOOBInline() throws SocketException {
+        return socket.getOOBInline();
+    }
+    public int getPort() {
+        return socket.getPort();
+    }
+    public int getReceiveBufferSize() throws SocketException {
+        return socket.getReceiveBufferSize();
+    }
+    public SocketAddress getRemoteSocketAddress() {
+        return socket.getRemoteSocketAddress();
+    }
+    public boolean getReuseAddress() throws SocketException {
+        return socket.getReuseAddress();
+    }
+    public int getSendBufferSize() throws SocketException {
+        return socket.getSendBufferSize();
+    }
+    public int getSoLinger() throws SocketException {
+        return socket.getSoLinger();
+    }
+    public int getSoTimeout() throws SocketException {
+        return socket.getSoTimeout();
+    }
+    public boolean getTcpNoDelay() throws SocketException {
+        return socket.getTcpNoDelay();
+    }
+    public int getTrafficClass() throws SocketException {
+        return socket.getTrafficClass();
+    }
+    public boolean isBound() {
+        return socket.isBound();
+    }
+    public boolean isClosed() {
+        return socket.isClosed();
+    }
+    public boolean isConnected() {
+        return socket.isConnected();
+    }
+    public void setKeepAlive(boolean on) throws SocketException {
+        socket.setKeepAlive(on);
+    }
+    public void setOOBInline(boolean on) throws SocketException {
+        socket.setOOBInline(on);
+    }
+    public void setReceiveBufferSize(int size) throws SocketException {
+        socket.setReceiveBufferSize(size);
+    }
+    public void setReuseAddress(boolean on) throws SocketException {
+        socket.setReuseAddress(on);
+    }
+    public void setSendBufferSize(int size) throws SocketException {
+        socket.setSendBufferSize(size);
+    }
+    public void setSoLinger(boolean on, int linger) throws SocketException {
+        socket.setSoLinger(on, linger);
+    }
+    public void setTcpNoDelay(boolean on) throws SocketException {
+        socket.setTcpNoDelay(on);
+    }
+    public void setTrafficClass(int tc) throws SocketException {
+        socket.setTrafficClass(tc);
+    }    
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOBaseChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOSyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOSyncChannel.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOSyncChannel.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOSyncChannel.java Tue Feb 21 15:12:56 2006
@@ -1,104 +1,104 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activeio.packet.sync.nio;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.packet.EOSPacket;
-import org.apache.activeio.packet.EmptyPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.SyncChannelServer;
-
-/**
- * A {@see org.apache.activeio.SynchChannel} implementation that uses a {@see java.nio.channels.SocketChannel}
- * to talk to the network.
- * 
- * Using a SocketChannelSynchChannel should be more efficient than using a SocketSynchChannel since
- * direct ByteBuffer can be used to reduce the jvm overhead needed to copy byte[]s.
- * 
- * @version $Revision$
- */
-final public class NIOSyncChannel extends NIOBaseChannel implements SyncChannel {
-
-    private ByteBuffer inputByteBuffer;
-//    private Packet data2;
-
-    protected NIOSyncChannel(SocketChannel socketChannel) throws IOException {
-        this(socketChannel, true );
-    }
-
-    protected NIOSyncChannel(SocketChannel socketChannel, boolean useDirect) throws IOException {
-        super(socketChannel, useDirect);
-    }
-    
-    public Packet read(long timeout) throws IOException {
-        try {
-            
-            if( timeout==SyncChannelServer.WAIT_FOREVER_TIMEOUT )
-                setSoTimeout( 0 );
-            else if( timeout==SyncChannelServer.NO_WAIT_TIMEOUT )
-                setSoTimeout( 1 );
-            else 
-                setSoTimeout( (int)timeout );
-
-            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
-                inputByteBuffer = allocateBuffer();
-            }
-
-            int size = socketChannel.read(inputByteBuffer);
-            if( size == -1 )
-                return EOSPacket.EOS_PACKET;
-            if( size == 0 )
-                return EmptyPacket.EMPTY_PACKET;
-
-            ByteBuffer remaining = inputByteBuffer.slice();            
-            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
-            
-            // Keep the remaining buffer around to fill with data.
-            inputByteBuffer = remaining;
-            return data;
-            
-        } catch (SocketTimeoutException e) {
-            return null;
-        }
-    }
-    
-    public void write(Packet packet) throws IOException {
-    	ByteBuffer data;
-        if( packet.getClass()==ByteBufferPacket.class ) {
-            data = ((ByteBufferPacket)packet).getByteBuffer();            
-        } else {
-        	ByteSequence sequence = packet.asByteSequence();
-        	data = ByteBuffer.wrap(sequence.getData(), sequence.getOffset(), sequence.getLength());
-        }
-        socketChannel.write( data );            
-    }
-
-	public void start() throws IOException {
-	}
-
-	public void stop() throws IOException {
-	}
-    
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeio.packet.sync.nio;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.EmptyPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+/**
+ * A {@see org.apache.activeio.SynchChannel} implementation that uses a {@see java.nio.channels.SocketChannel}
+ * to talk to the network.
+ * 
+ * Using a SocketChannelSynchChannel should be more efficient than using a SocketSynchChannel since
+ * direct ByteBuffer can be used to reduce the jvm overhead needed to copy byte[]s.
+ * 
+ * @version $Revision$
+ */
+final public class NIOSyncChannel extends NIOBaseChannel implements SyncChannel {
+
+    private ByteBuffer inputByteBuffer;
+//    private Packet data2;
+
+    protected NIOSyncChannel(SocketChannel socketChannel) throws IOException {
+        this(socketChannel, true );
+    }
+
+    protected NIOSyncChannel(SocketChannel socketChannel, boolean useDirect) throws IOException {
+        super(socketChannel, useDirect);
+    }
+    
+    public Packet read(long timeout) throws IOException {
+        try {
+            
+            if( timeout==SyncChannelServer.WAIT_FOREVER_TIMEOUT )
+                setSoTimeout( 0 );
+            else if( timeout==SyncChannelServer.NO_WAIT_TIMEOUT )
+                setSoTimeout( 1 );
+            else 
+                setSoTimeout( (int)timeout );
+
+            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
+                inputByteBuffer = allocateBuffer();
+            }
+
+            int size = socketChannel.read(inputByteBuffer);
+            if( size == -1 )
+                return EOSPacket.EOS_PACKET;
+            if( size == 0 )
+                return EmptyPacket.EMPTY_PACKET;
+
+            ByteBuffer remaining = inputByteBuffer.slice();            
+            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
+            
+            // Keep the remaining buffer around to fill with data.
+            inputByteBuffer = remaining;
+            return data;
+            
+        } catch (SocketTimeoutException e) {
+            return null;
+        }
+    }
+    
+    public void write(Packet packet) throws IOException {
+    	ByteBuffer data;
+        if( packet.getClass()==ByteBufferPacket.class ) {
+            data = ((ByteBufferPacket)packet).getByteBuffer();            
+        } else {
+        	ByteSequence sequence = packet.asByteSequence();
+        	data = ByteBuffer.wrap(sequence.getData(), sequence.getOffset(), sequence.getLength());
+        }
+        socketChannel.write( data );            
+    }
+
+	public void start() throws IOException {
+	}
+
+	public void stop() throws IOException {
+	}
+    
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/sync/nio/NIOSyncChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message