activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [7/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ activ...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/LongRecordLocation.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/LongRecordLocation.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/LongRecordLocation.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/LongRecordLocation.java Tue Feb 21 15:12:56 2006
@@ -1,73 +1,73 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.howl;
-
-import org.apache.activeio.journal.RecordLocation;
-
-/**
- * Provides a RecordLocation implementation for the long based
- * location pointers that HOWL uses.
- * 
- * @version $Revision: 1.1 $
- */
-public class LongRecordLocation implements RecordLocation {
-
-	final private long location;
-
-	public LongRecordLocation(long l) {
-		this.location = l;
-	}
-
-	/**
-	 * @see java.lang.Comparable#compareTo(java.lang.Object)
-	 */
-	public int compareTo(Object o) {
-		return (int) (location - ((LongRecordLocation) o).location);
-	}
-
-	/**
-	 * @return the original long location provided by HOWL
-	 */
-	public long getLongLocation() {
-		return location;
-	}
-
-	/**
-	 * @see java.lang.Object#hashCode()
-	 */
-	public int hashCode() {
-		int lowPart = (int) (0xFFFFFFFF & location);
-		int highPart = (int) (0xFFFFFFFF & (location >> 4));
-		return lowPart ^ highPart;
-	}
-
-	/**
-	 * @see java.lang.Object#equals(java.lang.Object)
-	 */
-	public boolean equals(Object o) {
-		if (o == null || o.getClass() != LongRecordLocation.class)
-			return false;
-		return ((LongRecordLocation) o).location == location;
-	}
-
-	/**
-	 * @see java.lang.Object#toString()
-	 */
-	public String toString() {
-		return "0x" + Long.toHexString(location);
-	}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.howl;
+
+import org.apache.activeio.journal.RecordLocation;
+
+/**
+ * Provides a RecordLocation implementation for the long based
+ * location pointers that HOWL uses.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class LongRecordLocation implements RecordLocation {
+
+	final private long location;
+
+	public LongRecordLocation(long l) {
+		this.location = l;
+	}
+
+	/**
+	 * @see java.lang.Comparable#compareTo(java.lang.Object)
+	 */
+	public int compareTo(Object o) {
+		return (int) (location - ((LongRecordLocation) o).location);
+	}
+
+	/**
+	 * @return the original long location provided by HOWL
+	 */
+	public long getLongLocation() {
+		return location;
+	}
+
+	/**
+	 * @see java.lang.Object#hashCode()
+	 */
+	public int hashCode() {
+		int lowPart = (int) (0xFFFFFFFF & location);
+		int highPart = (int) (0xFFFFFFFF & (location >> 4));
+		return lowPart ^ highPart;
+	}
+
+	/**
+	 * @see java.lang.Object#equals(java.lang.Object)
+	 */
+	public boolean equals(Object o) {
+		if (o == null || o.getClass() != LongRecordLocation.class)
+			return false;
+		return ((LongRecordLocation) o).location == location;
+	}
+
+	/**
+	 * @see java.lang.Object#toString()
+	 */
+	public String toString() {
+		return "0x" + Long.toHexString(location);
+	}
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/LongRecordLocation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/HttpRecognizer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/HttpRecognizer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/HttpRecognizer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/HttpRecognizer.java Tue Feb 21 15:12:56 2006
@@ -1,66 +1,66 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.oneport;
-
-import java.util.HashSet;
-
-import org.apache.activeio.packet.Packet;
-
-
-public class HttpRecognizer implements ProtocolRecognizer {
-
-    static private HashSet methods = new HashSet();
-    static {
-        // This list built using: http://www.w3.org/Protocols/HTTP/Methods.html
-        methods.add("GET ");
-        methods.add("PUT ");
-        methods.add("POST ");
-        methods.add("HEAD ");
-        methods.add("LINK ");
-        methods.add("TRACE ");
-        methods.add("UNLINK ");
-        methods.add("SEARCH ");
-        methods.add("DELETE ");
-        methods.add("CHECKIN ");
-        methods.add("OPTIONS ");
-        methods.add("CONNECT ");
-        methods.add("CHECKOUT ");
-        methods.add("SPACEJUMP ");
-        methods.add("SHOWMETHOD ");
-        methods.add("TEXTSEARCH ");        
-    }
-    
-    static final public HttpRecognizer HTTP_RECOGNIZER = new HttpRecognizer();
-    
-    private HttpRecognizer() {}
-    
-    public boolean recognizes(Packet packet) {
-        
-        StringBuffer b = new StringBuffer(12);
-        for (int i = 0; i < 11; i++) {
-            int c = (char)packet.read();
-            if( c == -1)
-                return false;
-            
-            b.append((char)c);
-            if(((char)c)==' ')
-                break;
-        }
-        
-        return methods.contains(b.toString());
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.oneport;
+
+import java.util.HashSet;
+
+import org.apache.activeio.packet.Packet;
+
+
+public class HttpRecognizer implements ProtocolRecognizer {
+
+    static private HashSet methods = new HashSet();
+    static {
+        // This list built using: http://www.w3.org/Protocols/HTTP/Methods.html
+        methods.add("GET ");
+        methods.add("PUT ");
+        methods.add("POST ");
+        methods.add("HEAD ");
+        methods.add("LINK ");
+        methods.add("TRACE ");
+        methods.add("UNLINK ");
+        methods.add("SEARCH ");
+        methods.add("DELETE ");
+        methods.add("CHECKIN ");
+        methods.add("OPTIONS ");
+        methods.add("CONNECT ");
+        methods.add("CHECKOUT ");
+        methods.add("SPACEJUMP ");
+        methods.add("SHOWMETHOD ");
+        methods.add("TEXTSEARCH ");        
+    }
+    
+    static final public HttpRecognizer HTTP_RECOGNIZER = new HttpRecognizer();
+    
+    private HttpRecognizer() {}
+    
+    public boolean recognizes(Packet packet) {
+        
+        StringBuffer b = new StringBuffer(12);
+        for (int i = 0; i < 11; i++) {
+            int c = (char)packet.read();
+            if( c == -1)
+                return false;
+            
+            b.append((char)c);
+            if(((char)c)==' ')
+                break;
+        }
+        
+        return methods.contains(b.toString());
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/HttpRecognizer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/IIOPRecognizer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/IIOPRecognizer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/IIOPRecognizer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/IIOPRecognizer.java Tue Feb 21 15:12:56 2006
@@ -1,36 +1,36 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.oneport;
-
-import org.apache.activeio.packet.Packet;
-
-
-public class IIOPRecognizer implements ProtocolRecognizer {
-    
-    static final public IIOPRecognizer IIOP_RECOGNIZER = new IIOPRecognizer();
-    
-    private IIOPRecognizer() {}
-    
-    public boolean recognizes(Packet packet) {
-        return ( 
-            packet.read()=='G' &&
-            packet.read()=='I' &&
-            packet.read()=='O' &&
-            packet.read()=='P' 
-                );
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.oneport;
+
+import org.apache.activeio.packet.Packet;
+
+
+public class IIOPRecognizer implements ProtocolRecognizer {
+    
+    static final public IIOPRecognizer IIOP_RECOGNIZER = new IIOPRecognizer();
+    
+    private IIOPRecognizer() {}
+    
+    public boolean recognizes(Packet packet) {
+        return ( 
+            packet.read()=='G' &&
+            packet.read()=='I' &&
+            packet.read()=='O' &&
+            packet.read()=='P' 
+                );
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/IIOPRecognizer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/OnePortAsyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/OnePortAsyncChannelServer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/OnePortAsyncChannelServer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/OnePortAsyncChannelServer.java Tue Feb 21 15:12:56 2006
@@ -1,223 +1,223 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.oneport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Iterator;
-
-import org.apache.activeio.AcceptListener;
-import org.apache.activeio.Channel;
-import org.apache.activeio.adapter.AsyncToSyncChannel;
-import org.apache.activeio.adapter.SyncToAsyncChannel;
-import org.apache.activeio.packet.AppendedPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.async.AsyncChannel;
-import org.apache.activeio.packet.async.AsyncChannelListener;
-import org.apache.activeio.packet.async.AsyncChannelServer;
-import org.apache.activeio.packet.async.FilterAsyncChannel;
-import org.apache.activeio.packet.async.FilterAsyncChannelServer;
-import org.apache.activeio.packet.sync.SyncChannel;
-import org.apache.activeio.packet.sync.filter.PushbackSyncChannel;
-
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Allows multiple protocols share a single ChannelServer.  All protocols sharing the server 
- * must have a distinct magic number at the beginning of the client's request.
- * 
- * TODO: handle the case where a client opens a connection but sends no data down the stream.  We need
- * to timeout that client.
- * 
- * @version $Revision$
- */
-final public class OnePortAsyncChannelServer extends FilterAsyncChannelServer {
-    
-    /**
-     * The OnePortAsyncChannelServer listens for incoming connection
-     * from a normal AsyncChannelServer.  This s the listner used 
-     * to receive the accepted channels.
-     */
-    final private class OnePortAcceptListener implements AcceptListener {
-        
-        public void onAccept(Channel channel) {
-            try {
-                AsyncChannel asyncChannel = SyncToAsyncChannel.adapt(channel);
-                ProtocolInspectingAsyncChannel inspector = new ProtocolInspectingAsyncChannel(asyncChannel);
-                inspector.start();                
-            } catch (IOException e) {
-                onAcceptError(e);
-            }                
-        }
-        
-        public void onAcceptError(IOException error) {
-            dispose();
-        }
-    }
-
-    /**
-     * This channel filter sniffs the first few bytes of the byte stream 
-     * to see if a ProtocolRecognizer recognizes the protocol.  If it does not
-     * it just closes the channel, otherwise the associated SubPortAsyncChannelServer
-     * is notified that it accepted a channel.
-     *
-     */
-    final private class ProtocolInspectingAsyncChannel extends FilterAsyncChannel {
-        private Packet buffer;
-
-        public ProtocolInspectingAsyncChannel(AsyncChannel next) throws IOException {
-            super(next);
-            setAsyncChannelListener(new AsyncChannelListener() {
-                public void onPacket(Packet packet) {
-                    if (buffer == null) {
-                        buffer = packet;
-                    } else {
-                        buffer = AppendedPacket.join(buffer, packet);
-                    }
-                    findMagicNumber();
-                }
-
-                public void onPacketError(IOException error) {
-                    dispose();
-                }
-            });
-        }
-
-        private void findMagicNumber() {
-            for (Iterator iter = recognizerMap.keySet().iterator(); iter.hasNext();) {
-                ProtocolRecognizer recognizer = (ProtocolRecognizer) iter.next();
-                if (recognizer.recognizes(buffer.duplicate())) {
-
-                    if( UnknownRecognizer.UNKNOWN_RECOGNIZER == recognizer ) {
-                        // Dispose the channel.. don't know what to do with it.
-                        dispose();
-                    }
-                    
-                    SubPortAsyncChannelServer onePort = (SubPortAsyncChannelServer) recognizerMap.get(recognizer);
-                    if( onePort == null ) {
-                        // Dispose the channel.. don't know what to do with it.
-                        dispose();
-                    }
-
-                    // Once the magic number is found:
-                    // Stop the channel so that a decision can be taken on what to
-                    // do with the
-                    // channel. When the channel is restarted, the buffered up
-                    // packets wiil get
-                    // delivered.
-                    try {
-                        stop();
-                        setAsyncChannelListener(null);
-                    } catch (IOException e) {                        
-                        getAsyncChannelListener().onPacketError(e);
-                    }
-                    
-                    Channel channel = getNext();
-                    channel = AsyncToSyncChannel.adapt(channel);
-                    channel = new PushbackSyncChannel((SyncChannel) channel, buffer);
-                    channel = SyncToAsyncChannel.adapt(channel);
-                    
-                    onePort.onAccept(channel);
-                    break;
-                }
-            }
-        }
-    }    
-
-    /**
-     * Clients bind against the OnePortAsyncChannelServer and get 
-     * SubPortAsyncChannelServer which can be used to accept connections.
-     */
-    final private class SubPortAsyncChannelServer implements AsyncChannelServer {
-        
-        private final ProtocolRecognizer recognizer;
-        private AcceptListener acceptListener;
-        private boolean started;
-        
-        /**
-         * @param recognizer
-         */
-        public SubPortAsyncChannelServer(ProtocolRecognizer recognizer) {
-            this.recognizer = recognizer;
-        }
-
-        public void setAcceptListener(AcceptListener acceptListener) {
-            this.acceptListener = acceptListener;
-        }
-        
-        public URI getBindURI() {
-            return next.getBindURI();
-        }
-        
-        public URI getConnectURI() {
-            return next.getConnectURI();
-        }
-        
-        public void dispose() {
-            started = false;
-            recognizerMap.remove(recognizer);
-        }
-        
-        public void start() throws IOException {
-            started = true;
-        }
-        public void stop() throws IOException {
-            started = false;
-        }
-        
-        void onAccept(Channel channel) {
-            if( started && acceptListener!=null ) {
-                acceptListener.onAccept(channel);
-            } else {
-                // Dispose the channel.. don't know what to do with it.
-                channel.dispose();
-            }
-        }
-        
-        public Object getAdapter(Class target) {
-            if( target.isAssignableFrom(getClass()) ) {
-                return this;
-            }
-            return OnePortAsyncChannelServer.this.getAdapter(target);
-        }    
-        
-    }
-    
-    
-    private final ConcurrentHashMap recognizerMap = new ConcurrentHashMap();
-
-    public OnePortAsyncChannelServer(AsyncChannelServer server) throws IOException {
-        super(server);
-        super.setAcceptListener(new OnePortAcceptListener());
-    }
-    
-    public void setAcceptListener(AcceptListener acceptListener) {
-        throw new IllegalAccessError("Not supported");
-    }    
-    
-    public AsyncChannelServer bindAsyncChannel(ProtocolRecognizer recognizer) throws IOException {
-        
-        if( recognizerMap.contains(recognizer) ) 
-            throw new IOException("That recognizer is allredy bound.");
-        
-        SubPortAsyncChannelServer server = new SubPortAsyncChannelServer(recognizer);
-        Object old = recognizerMap.put(recognizer, server);
-        return server;
-    }
-    
-    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.oneport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+
+import org.apache.activeio.AcceptListener;
+import org.apache.activeio.Channel;
+import org.apache.activeio.adapter.AsyncToSyncChannel;
+import org.apache.activeio.adapter.SyncToAsyncChannel;
+import org.apache.activeio.packet.AppendedPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.async.FilterAsyncChannel;
+import org.apache.activeio.packet.async.FilterAsyncChannelServer;
+import org.apache.activeio.packet.sync.SyncChannel;
+import org.apache.activeio.packet.sync.filter.PushbackSyncChannel;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Allows multiple protocols share a single ChannelServer.  All protocols sharing the server 
+ * must have a distinct magic number at the beginning of the client's request.
+ * 
+ * TODO: handle the case where a client opens a connection but sends no data down the stream.  We need
+ * to timeout that client.
+ * 
+ * @version $Revision$
+ */
+final public class OnePortAsyncChannelServer extends FilterAsyncChannelServer {
+    
+    /**
+     * The OnePortAsyncChannelServer listens for incoming connection
+     * from a normal AsyncChannelServer.  This s the listner used 
+     * to receive the accepted channels.
+     */
+    final private class OnePortAcceptListener implements AcceptListener {
+        
+        public void onAccept(Channel channel) {
+            try {
+                AsyncChannel asyncChannel = SyncToAsyncChannel.adapt(channel);
+                ProtocolInspectingAsyncChannel inspector = new ProtocolInspectingAsyncChannel(asyncChannel);
+                inspector.start();                
+            } catch (IOException e) {
+                onAcceptError(e);
+            }                
+        }
+        
+        public void onAcceptError(IOException error) {
+            dispose();
+        }
+    }
+
+    /**
+     * This channel filter sniffs the first few bytes of the byte stream 
+     * to see if a ProtocolRecognizer recognizes the protocol.  If it does not
+     * it just closes the channel, otherwise the associated SubPortAsyncChannelServer
+     * is notified that it accepted a channel.
+     *
+     */
+    final private class ProtocolInspectingAsyncChannel extends FilterAsyncChannel {
+        private Packet buffer;
+
+        public ProtocolInspectingAsyncChannel(AsyncChannel next) throws IOException {
+            super(next);
+            setAsyncChannelListener(new AsyncChannelListener() {
+                public void onPacket(Packet packet) {
+                    if (buffer == null) {
+                        buffer = packet;
+                    } else {
+                        buffer = AppendedPacket.join(buffer, packet);
+                    }
+                    findMagicNumber();
+                }
+
+                public void onPacketError(IOException error) {
+                    dispose();
+                }
+            });
+        }
+
+        private void findMagicNumber() {
+            for (Iterator iter = recognizerMap.keySet().iterator(); iter.hasNext();) {
+                ProtocolRecognizer recognizer = (ProtocolRecognizer) iter.next();
+                if (recognizer.recognizes(buffer.duplicate())) {
+
+                    if( UnknownRecognizer.UNKNOWN_RECOGNIZER == recognizer ) {
+                        // Dispose the channel.. don't know what to do with it.
+                        dispose();
+                    }
+                    
+                    SubPortAsyncChannelServer onePort = (SubPortAsyncChannelServer) recognizerMap.get(recognizer);
+                    if( onePort == null ) {
+                        // Dispose the channel.. don't know what to do with it.
+                        dispose();
+                    }
+
+                    // Once the magic number is found:
+                    // Stop the channel so that a decision can be taken on what to
+                    // do with the
+                    // channel. When the channel is restarted, the buffered up
+                    // packets wiil get
+                    // delivered.
+                    try {
+                        stop();
+                        setAsyncChannelListener(null);
+                    } catch (IOException e) {                        
+                        getAsyncChannelListener().onPacketError(e);
+                    }
+                    
+                    Channel channel = getNext();
+                    channel = AsyncToSyncChannel.adapt(channel);
+                    channel = new PushbackSyncChannel((SyncChannel) channel, buffer);
+                    channel = SyncToAsyncChannel.adapt(channel);
+                    
+                    onePort.onAccept(channel);
+                    break;
+                }
+            }
+        }
+    }    
+
+    /**
+     * Clients bind against the OnePortAsyncChannelServer and get 
+     * SubPortAsyncChannelServer which can be used to accept connections.
+     */
+    final private class SubPortAsyncChannelServer implements AsyncChannelServer {
+        
+        private final ProtocolRecognizer recognizer;
+        private AcceptListener acceptListener;
+        private boolean started;
+        
+        /**
+         * @param recognizer
+         */
+        public SubPortAsyncChannelServer(ProtocolRecognizer recognizer) {
+            this.recognizer = recognizer;
+        }
+
+        public void setAcceptListener(AcceptListener acceptListener) {
+            this.acceptListener = acceptListener;
+        }
+        
+        public URI getBindURI() {
+            return next.getBindURI();
+        }
+        
+        public URI getConnectURI() {
+            return next.getConnectURI();
+        }
+        
+        public void dispose() {
+            started = false;
+            recognizerMap.remove(recognizer);
+        }
+        
+        public void start() throws IOException {
+            started = true;
+        }
+        public void stop() throws IOException {
+            started = false;
+        }
+        
+        void onAccept(Channel channel) {
+            if( started && acceptListener!=null ) {
+                acceptListener.onAccept(channel);
+            } else {
+                // Dispose the channel.. don't know what to do with it.
+                channel.dispose();
+            }
+        }
+        
+        public Object getAdapter(Class target) {
+            if( target.isAssignableFrom(getClass()) ) {
+                return this;
+            }
+            return OnePortAsyncChannelServer.this.getAdapter(target);
+        }    
+        
+    }
+    
+    
+    private final ConcurrentHashMap recognizerMap = new ConcurrentHashMap();
+
+    public OnePortAsyncChannelServer(AsyncChannelServer server) throws IOException {
+        super(server);
+        super.setAcceptListener(new OnePortAcceptListener());
+    }
+    
+    public void setAcceptListener(AcceptListener acceptListener) {
+        throw new IllegalAccessError("Not supported");
+    }    
+    
+    public AsyncChannelServer bindAsyncChannel(ProtocolRecognizer recognizer) throws IOException {
+        
+        if( recognizerMap.contains(recognizer) ) 
+            throw new IOException("That recognizer is allredy bound.");
+        
+        SubPortAsyncChannelServer server = new SubPortAsyncChannelServer(recognizer);
+        Object old = recognizerMap.put(recognizer, server);
+        return server;
+    }
+    
+    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/OnePortAsyncChannelServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/ProtocolRecognizer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/ProtocolRecognizer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/ProtocolRecognizer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/ProtocolRecognizer.java Tue Feb 21 15:12:56 2006
@@ -1,26 +1,26 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.oneport;
-
-import org.apache.activeio.packet.Packet;
-
-/**
- *
- */
-public interface ProtocolRecognizer {
-    boolean recognizes(Packet packet);
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.oneport;
+
+import org.apache.activeio.packet.Packet;
+
+/**
+ *
+ */
+public interface ProtocolRecognizer {
+    boolean recognizes(Packet packet);
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/ProtocolRecognizer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/UnknownRecognizer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/UnknownRecognizer.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/UnknownRecognizer.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/UnknownRecognizer.java Tue Feb 21 15:12:56 2006
@@ -1,34 +1,34 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.oneport;
-
-import org.apache.activeio.packet.Packet;
-
-
-class UnknownRecognizer implements ProtocolRecognizer {
-    
-    static public final ProtocolRecognizer UNKNOWN_RECOGNIZER = new UnknownRecognizer();
-    
-    private UnknownRecognizer() {        
-    }
-    
-    public boolean recognizes(Packet packet) {
-        if( packet.limit() > 15 )
-            return true;
-        return false;
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.oneport;
+
+import org.apache.activeio.packet.Packet;
+
+
+class UnknownRecognizer implements ProtocolRecognizer {
+    
+    static public final ProtocolRecognizer UNKNOWN_RECOGNIZER = new UnknownRecognizer();
+    
+    private UnknownRecognizer() {        
+    }
+    
+    public boolean recognizes(Packet packet) {
+        if( packet.limit() > 15 )
+            return true;
+        return false;
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/oneport/UnknownRecognizer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/AppendedPacket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/AppendedPacket.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/AppendedPacket.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/AppendedPacket.java Tue Feb 21 15:12:56 2006
@@ -1,245 +1,245 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-
-
-/**
- * Appends two packets together.
- * 
- * @version $Revision$
- */
-final public class AppendedPacket implements Packet {
-
-    private final Packet first;
-    private final Packet last;
-
-    private final int capacity;
-    private final int firstCapacity;
-
-    static public Packet join(Packet first, Packet last) {
-        if( first.hasRemaining() ) {
-            if( last.hasRemaining() ) {
-                
-                //TODO: this might even be a rejoin of the same continous buffer.
-                //It would be good if we detected that and avoided just returned the buffer.
-                
-                return new AppendedPacket(first.slice(), last.slice());               
-            } else {
-                return first.slice();
-            }
-        } else {
-            if( last.hasRemaining() ) {
-                return last.slice();                
-            } else {
-                return EmptyPacket.EMPTY_PACKET;
-            }            
-        }
-    }
-    
-    /**
-     * @deprecated use {@see #join(Packet, Packet)} instead.
-     */
-    public AppendedPacket(Packet first, Packet second) {
-        this.first = first;
-        this.last = second;
-        this.firstCapacity = first.capacity();
-        this.capacity = first.capacity()+last.capacity();
-        clear();        
-    }
-        
-    public void position(int position) {
-        if( position <= firstCapacity ) {
-            last.position(0);
-            first.position(position);
-        } else {
-            last.position(position-firstCapacity);
-            first.position(firstCapacity);
-        }
-    }
-    
-    public void limit(int limit) {
-        if( limit <= firstCapacity ) {
-            last.limit(0);
-            first.limit(limit);
-        } else {
-            last.limit(limit-firstCapacity);
-            first.limit(firstCapacity);
-        }
-    }
-
-    public Packet slice() {
-        return join(first,last);
-    }
-
-    public Packet duplicate() {
-        return new AppendedPacket(first.duplicate(), last.duplicate());               
-    }
-
-    public Object duplicate(ClassLoader cl) throws IOException {
-        try {
-            Class pclazz = cl.loadClass(Packet.class.getName());
-            Class clazz = cl.loadClass(AppendedPacket.class.getName());
-            Constructor constructor = clazz.getConstructor(new Class[]{pclazz, pclazz});
-            return constructor.newInstance(new Object[]{first.duplicate(cl), last.duplicate(cl)});
-        } catch (Throwable e) {
-            throw (IOException)new IOException("Could not duplicate packet in a different classloader: "+e).initCause(e);
-        }
-    }
-    
-    public void flip() {
-        limit(position());
-        position(0);
-    }
-
-    public int position() {
-        return first.position()+last.position();
-    }
-    
-    public int limit() {
-        return first.limit()+last.limit();
-    }    
-
-    public int remaining() {
-        return first.remaining()+last.remaining();
-    }
-
-    public void rewind() {
-        first.rewind();
-        last.rewind();
-    }
-
-    public boolean hasRemaining() {
-        return first.hasRemaining()||last.hasRemaining();
-    }
-
-    public void clear() {
-        first.clear();
-        last.clear();        
-    }
-
-    public int capacity() {
-        return capacity;
-    }
-
-    public void writeTo(OutputStream out) throws IOException {
-        first.writeTo(out);
-        last.writeTo(out);
-    }
-    
-    public void writeTo(DataOutput out) throws IOException {
-        first.writeTo(out);
-        last.writeTo(out);
-    }
-
-
-    /**
-     * @see org.apache.activeio.packet.Packet#read()
-     */
-    public int read() {
-        if( first.hasRemaining() ) {
-            return first.read();
-        } else if( last.hasRemaining() ) {
-            return last.read();
-        } else {
-            return -1;
-        }
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#read(byte[], int, int)
-     */
-    public int read(byte[] data, int offset, int length) {        
-        
-        int rc1 = first.read(data, offset, length);        
-        if( rc1==-1 ) {
-            int rc2 = last.read(data, offset, length);
-            return ( rc2==-1 ) ? -1 : rc2;
-        } else {
-            int rc2 = last.read(data, offset+rc1, length-rc1);
-            return ( rc2==-1 ) ? rc1 : rc1+rc2;
-        }
-
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#write(int)
-     */
-    public boolean write(int data) {
-        if( first.hasRemaining() ) {
-            return first.write(data);
-        } else if( last.hasRemaining() ) {
-            return last.write(data);
-        } else {
-            return false;
-        }
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#write(byte[], int, int)
-     */
-    public int write(byte[] data, int offset, int length) {
-        int rc1 = first.write(data, offset, length);        
-        if( rc1==-1 ) {
-            int rc2 = last.write(data, offset, length);
-            return ( rc2==-1 ) ? -1 : rc2;
-        } else {
-            int rc2 = last.write(data, offset+rc1, length-rc1);
-            return ( rc2==-1 ) ? rc1 : rc1+rc2;
-        }
-    }
-
-    public int read(Packet dest) {        
-	    int rc = first.read(dest);
-	    rc += last.read(dest);
-	    return rc;
-    }    
-    
-    public String toString() {
-        return "{position="+position()+",limit="+limit()+",capacity="+capacity()+"}";
-    }
-
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        Object object = first.getAdapter(target);
-        if( object == null )
-            object = last.getAdapter(target);
-        return object;
-    }
-
-    public ByteSequence asByteSequence() {      
-        // TODO: implement me
-        return null;
-    }
-
-    public byte[] sliceAsBytes() {
-        // TODO: implement me
-        return null;
-    }
-
-    public void dispose() {
-        first.dispose();
-        last.dispose();
-    }
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+
+
+/**
+ * Appends two packets together.
+ * 
+ * @version $Revision$
+ */
+final public class AppendedPacket implements Packet {
+
+    private final Packet first;
+    private final Packet last;
+
+    private final int capacity;
+    private final int firstCapacity;
+
+    static public Packet join(Packet first, Packet last) {
+        if( first.hasRemaining() ) {
+            if( last.hasRemaining() ) {
+                
+                //TODO: this might even be a rejoin of the same continous buffer.
+                //It would be good if we detected that and avoided just returned the buffer.
+                
+                return new AppendedPacket(first.slice(), last.slice());               
+            } else {
+                return first.slice();
+            }
+        } else {
+            if( last.hasRemaining() ) {
+                return last.slice();                
+            } else {
+                return EmptyPacket.EMPTY_PACKET;
+            }            
+        }
+    }
+    
+    /**
+     * @deprecated use {@see #join(Packet, Packet)} instead.
+     */
+    public AppendedPacket(Packet first, Packet second) {
+        this.first = first;
+        this.last = second;
+        this.firstCapacity = first.capacity();
+        this.capacity = first.capacity()+last.capacity();
+        clear();        
+    }
+        
+    public void position(int position) {
+        if( position <= firstCapacity ) {
+            last.position(0);
+            first.position(position);
+        } else {
+            last.position(position-firstCapacity);
+            first.position(firstCapacity);
+        }
+    }
+    
+    public void limit(int limit) {
+        if( limit <= firstCapacity ) {
+            last.limit(0);
+            first.limit(limit);
+        } else {
+            last.limit(limit-firstCapacity);
+            first.limit(firstCapacity);
+        }
+    }
+
+    public Packet slice() {
+        return join(first,last);
+    }
+
+    public Packet duplicate() {
+        return new AppendedPacket(first.duplicate(), last.duplicate());               
+    }
+
+    public Object duplicate(ClassLoader cl) throws IOException {
+        try {
+            Class pclazz = cl.loadClass(Packet.class.getName());
+            Class clazz = cl.loadClass(AppendedPacket.class.getName());
+            Constructor constructor = clazz.getConstructor(new Class[]{pclazz, pclazz});
+            return constructor.newInstance(new Object[]{first.duplicate(cl), last.duplicate(cl)});
+        } catch (Throwable e) {
+            throw (IOException)new IOException("Could not duplicate packet in a different classloader: "+e).initCause(e);
+        }
+    }
+    
+    public void flip() {
+        limit(position());
+        position(0);
+    }
+
+    public int position() {
+        return first.position()+last.position();
+    }
+    
+    public int limit() {
+        return first.limit()+last.limit();
+    }    
+
+    public int remaining() {
+        return first.remaining()+last.remaining();
+    }
+
+    public void rewind() {
+        first.rewind();
+        last.rewind();
+    }
+
+    public boolean hasRemaining() {
+        return first.hasRemaining()||last.hasRemaining();
+    }
+
+    public void clear() {
+        first.clear();
+        last.clear();        
+    }
+
+    public int capacity() {
+        return capacity;
+    }
+
+    public void writeTo(OutputStream out) throws IOException {
+        first.writeTo(out);
+        last.writeTo(out);
+    }
+    
+    public void writeTo(DataOutput out) throws IOException {
+        first.writeTo(out);
+        last.writeTo(out);
+    }
+
+
+    /**
+     * @see org.apache.activeio.packet.Packet#read()
+     */
+    public int read() {
+        if( first.hasRemaining() ) {
+            return first.read();
+        } else if( last.hasRemaining() ) {
+            return last.read();
+        } else {
+            return -1;
+        }
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#read(byte[], int, int)
+     */
+    public int read(byte[] data, int offset, int length) {        
+        
+        int rc1 = first.read(data, offset, length);        
+        if( rc1==-1 ) {
+            int rc2 = last.read(data, offset, length);
+            return ( rc2==-1 ) ? -1 : rc2;
+        } else {
+            int rc2 = last.read(data, offset+rc1, length-rc1);
+            return ( rc2==-1 ) ? rc1 : rc1+rc2;
+        }
+
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#write(int)
+     */
+    public boolean write(int data) {
+        if( first.hasRemaining() ) {
+            return first.write(data);
+        } else if( last.hasRemaining() ) {
+            return last.write(data);
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#write(byte[], int, int)
+     */
+    public int write(byte[] data, int offset, int length) {
+        int rc1 = first.write(data, offset, length);        
+        if( rc1==-1 ) {
+            int rc2 = last.write(data, offset, length);
+            return ( rc2==-1 ) ? -1 : rc2;
+        } else {
+            int rc2 = last.write(data, offset+rc1, length-rc1);
+            return ( rc2==-1 ) ? rc1 : rc1+rc2;
+        }
+    }
+
+    public int read(Packet dest) {        
+	    int rc = first.read(dest);
+	    rc += last.read(dest);
+	    return rc;
+    }    
+    
+    public String toString() {
+        return "{position="+position()+",limit="+limit()+",capacity="+capacity()+"}";
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        Object object = first.getAdapter(target);
+        if( object == null )
+            object = last.getAdapter(target);
+        return object;
+    }
+
+    public ByteSequence asByteSequence() {      
+        // TODO: implement me
+        return null;
+    }
+
+    public byte[] sliceAsBytes() {
+        // TODO: implement me
+        return null;
+    }
+
+    public void dispose() {
+        first.dispose();
+        last.dispose();
+    }
+
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/AppendedPacket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteArrayPacket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteArrayPacket.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteArrayPacket.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteArrayPacket.java Tue Feb 21 15:12:56 2006
@@ -1,239 +1,239 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-
-
-/**
- * Provides a Packet implementation that is directly backed by a <code>byte[]</code>.
- * 
- * @version $Revision$
- */
-final public class ByteArrayPacket implements Packet {
-
-    private final byte buffer[];
-
-    private final int offset;
-    private final int capacity;
-    private int position;
-    private int limit;
-    private int remaining;
-    
-
-    public ByteArrayPacket(byte buffer[]) {
-        this(buffer,0, buffer.length);
-    }
-    
-    public ByteArrayPacket(ByteSequence sequence) {
-        this(sequence.getData(), sequence.getOffset(), sequence.getLength());
-    }
-    
-    public ByteArrayPacket(byte buffer[], int offset, int capacity) {
-        this.buffer = buffer;
-        this.offset=offset;
-        this.capacity=capacity;
-		this.position = 0;
-		this.limit = capacity;
-		this.remaining = limit-position;
-    }
-
-    public int position() {
-        return position;
-    }
-
-    public void position(int position) {
-        this.position = position;
-        remaining = limit-position;
-    }
-
-    public int limit() {
-        return limit;
-    }
-
-    public void limit(int limit) {
-        this.limit = limit;
-        remaining = limit-position;
-    }
-
-    public void flip() {
-        limit = position;
-        position = 0;
-        remaining = limit - position;
-    }
-
-    public int remaining() {
-        return remaining;
-    }
-
-    public void rewind() {
-        position = 0;
-        remaining = limit - position;
-    }
-
-    public boolean hasRemaining() {
-        return remaining > 0;
-    }
-
-    public void clear() {
-        position = 0;
-        limit = capacity;
-        remaining = limit - position;
-    }
-
-    public int capacity() {
-        return capacity;
-    }
-
-    public Packet slice() {
-        return new ByteArrayPacket(buffer, offset+position, remaining);
-    }
-    
-    public Packet duplicate() {
-        return new ByteArrayPacket(buffer, offset, capacity);
-    }
-
-    public Object duplicate(ClassLoader cl) throws IOException {
-        try{
-            Class clazz = cl.loadClass(ByteArrayPacket.class.getName());
-            Constructor constructor = clazz.getConstructor(new Class[]{byte[].class, int.class, int.class});
-            return constructor.newInstance(new Object[]{buffer, new Integer(offset), new Integer(capacity())});
-        } catch (Throwable e) {
-            throw (IOException)new IOException("Could not duplicate packet in a different classloader: "+e).initCause(e);
-        }
-    }
-
-    public void writeTo(OutputStream out) throws IOException {
-        out.write(buffer, offset+position, remaining);
-        position=limit;
-        remaining = limit-position;
-    }
-    
-    public void writeTo(DataOutput out) throws IOException {
-        out.write(buffer, offset+position, remaining);
-        position=limit;
-        remaining = limit-position;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#read()
-     */
-    public int read() {
-        if( !(remaining > 0) )
-            return -1;
-        int rc = buffer[offset+position];
-        position++;
-        remaining = limit-position;
-        return rc & 0xff;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#read(byte[], int, int)
-     */
-    public int read(byte[] data, int offset, int length) {
-        if( !(remaining > 0) )
-            return -1;
-        
-        int copyLength = ((length <= remaining) ? length : remaining);
-        System.arraycopy(buffer, this.offset+position, data, offset, copyLength);
-        position += copyLength;
-        remaining = limit-position;
-        return copyLength;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#write(int)
-     */
-    public boolean write(int data) {
-        if( !(remaining > 0) )
-            return false;
-        buffer[offset+position]=(byte) data;
-        position++;
-        remaining = limit-position;
-        return true;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#write(byte[], int, int)
-     */
-    public int write(byte[] data, int offset, int length) {
-        if( !(remaining > 0) )
-            return -1;
-        
-        int copyLength = ((length <= remaining) ? length : remaining);
-        System.arraycopy(data, offset, buffer, this.offset+position, copyLength);
-        position+=copyLength;
-        remaining = limit-position;
-        return copyLength;
-    }
-
-    public ByteSequence asByteSequence() {
-        return new ByteSequence(buffer, offset+position, remaining);
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#sliceAsBytes()
-     */
-    public byte[] sliceAsBytes() {
-        if( buffer.length == remaining ) {
-            return buffer;
-        } else {
-            byte rc[] = new byte[remaining];
-            int op = position;
-            read(rc,0,remaining);
-            position=op;
-            remaining = limit-position;
-            return rc;
-        }
-    }
-    
-    /**
-     * @param dest
-     * @return the number of bytes read into the dest.
-     */
-    public int read(Packet dest) {        
-	    int a = dest.remaining();
-		int rc = ((a <= remaining) ? a : remaining); 
-		if( rc > 0 ) {
-		    dest.write( buffer, offset+position, rc);
-		    position = position+rc;
-	        remaining = limit-position;
-		}
-		return rc;
-    }
-    
-    public String toString() {
-        return "{position="+position+",limit="+limit+",capacity="+capacity+"}";
-    }
-
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-    
-    public byte[] getBuffer() {
-        return buffer;
-    }
-    
-    public void dispose() {        
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+
+
+/**
+ * Provides a Packet implementation that is directly backed by a <code>byte[]</code>.
+ * 
+ * @version $Revision$
+ */
+final public class ByteArrayPacket implements Packet {
+
+    private final byte buffer[];
+
+    private final int offset;
+    private final int capacity;
+    private int position;
+    private int limit;
+    private int remaining;
+    
+
+    public ByteArrayPacket(byte buffer[]) {
+        this(buffer,0, buffer.length);
+    }
+    
+    public ByteArrayPacket(ByteSequence sequence) {
+        this(sequence.getData(), sequence.getOffset(), sequence.getLength());
+    }
+    
+    public ByteArrayPacket(byte buffer[], int offset, int capacity) {
+        this.buffer = buffer;
+        this.offset=offset;
+        this.capacity=capacity;
+		this.position = 0;
+		this.limit = capacity;
+		this.remaining = limit-position;
+    }
+
+    public int position() {
+        return position;
+    }
+
+    public void position(int position) {
+        this.position = position;
+        remaining = limit-position;
+    }
+
+    public int limit() {
+        return limit;
+    }
+
+    public void limit(int limit) {
+        this.limit = limit;
+        remaining = limit-position;
+    }
+
+    public void flip() {
+        limit = position;
+        position = 0;
+        remaining = limit - position;
+    }
+
+    public int remaining() {
+        return remaining;
+    }
+
+    public void rewind() {
+        position = 0;
+        remaining = limit - position;
+    }
+
+    public boolean hasRemaining() {
+        return remaining > 0;
+    }
+
+    public void clear() {
+        position = 0;
+        limit = capacity;
+        remaining = limit - position;
+    }
+
+    public int capacity() {
+        return capacity;
+    }
+
+    public Packet slice() {
+        return new ByteArrayPacket(buffer, offset+position, remaining);
+    }
+    
+    public Packet duplicate() {
+        return new ByteArrayPacket(buffer, offset, capacity);
+    }
+
+    public Object duplicate(ClassLoader cl) throws IOException {
+        try{
+            Class clazz = cl.loadClass(ByteArrayPacket.class.getName());
+            Constructor constructor = clazz.getConstructor(new Class[]{byte[].class, int.class, int.class});
+            return constructor.newInstance(new Object[]{buffer, new Integer(offset), new Integer(capacity())});
+        } catch (Throwable e) {
+            throw (IOException)new IOException("Could not duplicate packet in a different classloader: "+e).initCause(e);
+        }
+    }
+
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(buffer, offset+position, remaining);
+        position=limit;
+        remaining = limit-position;
+    }
+    
+    public void writeTo(DataOutput out) throws IOException {
+        out.write(buffer, offset+position, remaining);
+        position=limit;
+        remaining = limit-position;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#read()
+     */
+    public int read() {
+        if( !(remaining > 0) )
+            return -1;
+        int rc = buffer[offset+position];
+        position++;
+        remaining = limit-position;
+        return rc & 0xff;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#read(byte[], int, int)
+     */
+    public int read(byte[] data, int offset, int length) {
+        if( !(remaining > 0) )
+            return -1;
+        
+        int copyLength = ((length <= remaining) ? length : remaining);
+        System.arraycopy(buffer, this.offset+position, data, offset, copyLength);
+        position += copyLength;
+        remaining = limit-position;
+        return copyLength;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#write(int)
+     */
+    public boolean write(int data) {
+        if( !(remaining > 0) )
+            return false;
+        buffer[offset+position]=(byte) data;
+        position++;
+        remaining = limit-position;
+        return true;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#write(byte[], int, int)
+     */
+    public int write(byte[] data, int offset, int length) {
+        if( !(remaining > 0) )
+            return -1;
+        
+        int copyLength = ((length <= remaining) ? length : remaining);
+        System.arraycopy(data, offset, buffer, this.offset+position, copyLength);
+        position+=copyLength;
+        remaining = limit-position;
+        return copyLength;
+    }
+
+    public ByteSequence asByteSequence() {
+        return new ByteSequence(buffer, offset+position, remaining);
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#sliceAsBytes()
+     */
+    public byte[] sliceAsBytes() {
+        if( buffer.length == remaining ) {
+            return buffer;
+        } else {
+            byte rc[] = new byte[remaining];
+            int op = position;
+            read(rc,0,remaining);
+            position=op;
+            remaining = limit-position;
+            return rc;
+        }
+    }
+    
+    /**
+     * @param dest
+     * @return the number of bytes read into the dest.
+     */
+    public int read(Packet dest) {        
+	    int a = dest.remaining();
+		int rc = ((a <= remaining) ? a : remaining); 
+		if( rc > 0 ) {
+		    dest.write( buffer, offset+position, rc);
+		    position = position+rc;
+	        remaining = limit-position;
+		}
+		return rc;
+    }
+    
+    public String toString() {
+        return "{position="+position+",limit="+limit+",capacity="+capacity+"}";
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+    
+    public byte[] getBuffer() {
+        return buffer;
+    }
+    
+    public void dispose() {        
+    }
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteArrayPacket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacket.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacket.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacket.java Tue Feb 21 15:12:56 2006
@@ -1,280 +1,280 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-
-
-/**
- * Provides a Packet implementation that is backed by a {@see java.nio.ByteBuffer}
- * 
- * @version $Revision$
- */
-final public class ByteBufferPacket implements Packet {
-
-	public static final int DEFAULT_BUFFER_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.DefaultByteBufferSize", ""+(64*1024)));
-	public static final int DEFAULT_DIRECT_BUFFER_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.DefaultDirectByteBufferSize", ""+(8*1024)));
-
-    private final ByteBuffer buffer;
-    private static final int TEMP_BUFFER_SIZE = 64*1024;
-
-    public ByteBufferPacket(ByteBuffer buffer) {
-        this.buffer = buffer;
-        clear();
-    }
-    
-    public ByteBuffer getByteBuffer() {
-        return buffer;
-    }
-    
-    public static ByteBufferPacket createDefaultBuffer(boolean direct) {
-    	if( direct )
-    		return new ByteBufferPacket( ByteBuffer.allocateDirect(DEFAULT_DIRECT_BUFFER_SIZE) );
-    	return new ByteBufferPacket( ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)  );
-    }
-    
-    public void writeTo(OutputStream out) throws IOException {
-        if( buffer.hasArray() ) {
-            
-            // If the buffer is backed by an array.. then use it directly.
-            out.write(buffer.array(), position(), remaining());
-            position(limit());
-            
-        } else {
-            
-            // It's not backed by a buffer.. We can only dump it to a OutputStream via a byte[] so,
-            // create a temp buffer that we can use to chunk it out.            
-            byte temp[] = new byte[TEMP_BUFFER_SIZE];            
-            while( buffer.hasRemaining() ) {
-                int maxWrite = buffer.remaining() > temp.length ? temp.length : buffer.remaining();
-	            buffer.get(temp, 0, maxWrite);
-	            out.write(temp,0, maxWrite);
-            }
-            
-        }        
-    }
-    
-    public void writeTo(DataOutput out) throws IOException {
-        if( buffer.hasArray() ) {
-            
-            // If the buffer is backed by an array.. then use it directly.
-            out.write(buffer.array(), position(), remaining());
-            position(limit());
-            
-        } else {
-            
-            // It's not backed by a buffer.. We can only dump it to a OutputStream via a byte[] so,
-            // create a temp buffer that we can use to chunk it out.            
-            byte temp[] = new byte[TEMP_BUFFER_SIZE];            
-            while( buffer.hasRemaining() ) {
-                int maxWrite = buffer.remaining() > temp.length ? temp.length : buffer.remaining();
-                buffer.get(temp, 0, maxWrite);
-                out.write(temp,0, maxWrite);
-            }
-            
-        }        
-    }
-
-    public int capacity() {
-        return buffer.capacity();
-    }
-
-    public void clear() {
-        buffer.clear();
-    }
-
-    public Packet compact() {
-        buffer.compact();
-        return this;
-    }
-
-    public void flip() {
-        buffer.flip();
-    }
-
-    public boolean hasRemaining() {
-        return buffer.hasRemaining();
-    }
-
-    public boolean isDirect() {
-        return buffer.isDirect();
-    }
-
-    public boolean isReadOnly() {
-        return buffer.isReadOnly();
-    }
-
-    public int limit() {
-        return buffer.limit();
-    }
-
-    public void limit(int arg0) {
-        buffer.limit(arg0);
-    }
-
-    public Packet mark() {
-        buffer.mark();
-        return this;
-    }
-
-    public int position() {
-        return buffer.position();
-    }
-
-    public void position(int arg0) {
-        buffer.position(arg0);
-    }
-
-    public int remaining() {
-        return buffer.remaining();
-    }
-
-    public void rewind() {
-        buffer.rewind();
-    }
-
-    public Packet slice() {
-        return new ByteBufferPacket(buffer.slice());
-    }
-
-    public Packet duplicate() {
-        return new ByteBufferPacket(buffer.duplicate());
-    }
-
-    public Object duplicate(ClassLoader cl) throws IOException {
-        try {
-            Class clazz = cl.loadClass(ByteBufferPacket.class.getName());
-            Constructor constructor = clazz.getConstructor(new Class[]{ByteBuffer.class});
-            return constructor.newInstance(new Object[]{buffer.duplicate()});
-        } catch (Throwable e) {
-            throw (IOException)new IOException("Could not duplicate packet in a different classloader: "+e).initCause(e);
-        }
-
-    }
-    
-
-    /**
-     * @see org.apache.activeio.packet.Packet#read()
-     */
-    public int read() {
-        if( !buffer.hasRemaining() )
-            return -1;
-        return buffer.get() & 0xff;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#read(byte[], int, int)
-     */
-    public int read(byte[] data, int offset, int length) {
-        if( !hasRemaining() )
-            return -1;
-        
-        int copyLength = Math.min(length, remaining());
-        buffer.get(data, offset, copyLength);
-        return copyLength;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#write(int)
-     */
-    public boolean write(int data) {
-        if( !buffer.hasRemaining() )
-            return false;
-        buffer.put((byte)data);
-        return true;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#write(byte[], int, int)
-     */
-    public int write(byte[] data, int offset, int length) {
-        if( !hasRemaining() )
-            return -1;
-
-        int copyLength = Math.min(length, remaining());
-        buffer.put(data, offset, copyLength);
-        return copyLength;
-    }
-
-    /**
-     * @see org.apache.activeio.packet.Packet#asByteSequence()
-     */
-    public ByteSequence asByteSequence() {
-        if( buffer.hasArray() ) {
-            byte[] bs = buffer.array();
-            return new ByteSequence(bs, buffer.position(), buffer.remaining());
-        }
-        // TODO: implement the direct case.
-        return null;
-    }
-    
-    /**
-     * @see org.apache.activeio.packet.Packet#sliceAsBytes()
-     */
-    public byte[] sliceAsBytes() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    /**
-     * @param dest
-     * @return the number of bytes read into the dest.
-     */
-    public int read(Packet dest) {
-        
-	    int rc = Math.min(dest.remaining(), remaining()); 
-		if( rc > 0 ) {
-		    
-	        if( dest.getClass() == ByteBufferPacket.class ) {            
-
-			    // Adjust our limit so that we don't overflow the dest buffer. 
-				int limit = limit();
-				limit(position()+rc);
-				
-	            ((ByteBufferPacket)dest).buffer.put(buffer);
-
-	            // restore the limit.
-				limit(limit);
-	            
-	            return 0;
-	        } else {	            
-	            ByteSequence sequence = dest.asByteSequence();
-	            rc = read(sequence.getData(), sequence.getOffset(), sequence.getLength());
-	            dest.position(dest.position()+rc);
-	        }
-		}
-		return rc;
-    }
-	
-    public String toString() {
-        return "{position="+position()+",limit="+limit()+",capacity="+capacity()+"}";
-    }
-
-    public Object getAdapter(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
-            return this;
-        }
-        return null;
-    }
-    
-    public void dispose() {        
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+
+
+/**
+ * Provides a Packet implementation that is backed by a {@see java.nio.ByteBuffer}
+ * 
+ * @version $Revision$
+ */
+final public class ByteBufferPacket implements Packet {
+
+	public static final int DEFAULT_BUFFER_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.DefaultByteBufferSize", ""+(64*1024)));
+	public static final int DEFAULT_DIRECT_BUFFER_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.DefaultDirectByteBufferSize", ""+(8*1024)));
+
+    private final ByteBuffer buffer;
+    private static final int TEMP_BUFFER_SIZE = 64*1024;
+
+    public ByteBufferPacket(ByteBuffer buffer) {
+        this.buffer = buffer;
+        clear();
+    }
+    
+    public ByteBuffer getByteBuffer() {
+        return buffer;
+    }
+    
+    public static ByteBufferPacket createDefaultBuffer(boolean direct) {
+    	if( direct )
+    		return new ByteBufferPacket( ByteBuffer.allocateDirect(DEFAULT_DIRECT_BUFFER_SIZE) );
+    	return new ByteBufferPacket( ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)  );
+    }
+    
+    public void writeTo(OutputStream out) throws IOException {
+        if( buffer.hasArray() ) {
+            
+            // If the buffer is backed by an array.. then use it directly.
+            out.write(buffer.array(), position(), remaining());
+            position(limit());
+            
+        } else {
+            
+            // It's not backed by a buffer.. We can only dump it to a OutputStream via a byte[] so,
+            // create a temp buffer that we can use to chunk it out.            
+            byte temp[] = new byte[TEMP_BUFFER_SIZE];            
+            while( buffer.hasRemaining() ) {
+                int maxWrite = buffer.remaining() > temp.length ? temp.length : buffer.remaining();
+	            buffer.get(temp, 0, maxWrite);
+	            out.write(temp,0, maxWrite);
+            }
+            
+        }        
+    }
+    
+    public void writeTo(DataOutput out) throws IOException {
+        if( buffer.hasArray() ) {
+            
+            // If the buffer is backed by an array.. then use it directly.
+            out.write(buffer.array(), position(), remaining());
+            position(limit());
+            
+        } else {
+            
+            // It's not backed by a buffer.. We can only dump it to a OutputStream via a byte[] so,
+            // create a temp buffer that we can use to chunk it out.            
+            byte temp[] = new byte[TEMP_BUFFER_SIZE];            
+            while( buffer.hasRemaining() ) {
+                int maxWrite = buffer.remaining() > temp.length ? temp.length : buffer.remaining();
+                buffer.get(temp, 0, maxWrite);
+                out.write(temp,0, maxWrite);
+            }
+            
+        }        
+    }
+
+    public int capacity() {
+        return buffer.capacity();
+    }
+
+    public void clear() {
+        buffer.clear();
+    }
+
+    public Packet compact() {
+        buffer.compact();
+        return this;
+    }
+
+    public void flip() {
+        buffer.flip();
+    }
+
+    public boolean hasRemaining() {
+        return buffer.hasRemaining();
+    }
+
+    public boolean isDirect() {
+        return buffer.isDirect();
+    }
+
+    public boolean isReadOnly() {
+        return buffer.isReadOnly();
+    }
+
+    public int limit() {
+        return buffer.limit();
+    }
+
+    public void limit(int arg0) {
+        buffer.limit(arg0);
+    }
+
+    public Packet mark() {
+        buffer.mark();
+        return this;
+    }
+
+    public int position() {
+        return buffer.position();
+    }
+
+    public void position(int arg0) {
+        buffer.position(arg0);
+    }
+
+    public int remaining() {
+        return buffer.remaining();
+    }
+
+    public void rewind() {
+        buffer.rewind();
+    }
+
+    public Packet slice() {
+        return new ByteBufferPacket(buffer.slice());
+    }
+
+    public Packet duplicate() {
+        return new ByteBufferPacket(buffer.duplicate());
+    }
+
+    public Object duplicate(ClassLoader cl) throws IOException {
+        try {
+            Class clazz = cl.loadClass(ByteBufferPacket.class.getName());
+            Constructor constructor = clazz.getConstructor(new Class[]{ByteBuffer.class});
+            return constructor.newInstance(new Object[]{buffer.duplicate()});
+        } catch (Throwable e) {
+            throw (IOException)new IOException("Could not duplicate packet in a different classloader: "+e).initCause(e);
+        }
+
+    }
+    
+
+    /**
+     * @see org.apache.activeio.packet.Packet#read()
+     */
+    public int read() {
+        if( !buffer.hasRemaining() )
+            return -1;
+        return buffer.get() & 0xff;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#read(byte[], int, int)
+     */
+    public int read(byte[] data, int offset, int length) {
+        if( !hasRemaining() )
+            return -1;
+        
+        int copyLength = Math.min(length, remaining());
+        buffer.get(data, offset, copyLength);
+        return copyLength;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#write(int)
+     */
+    public boolean write(int data) {
+        if( !buffer.hasRemaining() )
+            return false;
+        buffer.put((byte)data);
+        return true;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#write(byte[], int, int)
+     */
+    public int write(byte[] data, int offset, int length) {
+        if( !hasRemaining() )
+            return -1;
+
+        int copyLength = Math.min(length, remaining());
+        buffer.put(data, offset, copyLength);
+        return copyLength;
+    }
+
+    /**
+     * @see org.apache.activeio.packet.Packet#asByteSequence()
+     */
+    public ByteSequence asByteSequence() {
+        if( buffer.hasArray() ) {
+            byte[] bs = buffer.array();
+            return new ByteSequence(bs, buffer.position(), buffer.remaining());
+        }
+        // TODO: implement the direct case.
+        return null;
+    }
+    
+    /**
+     * @see org.apache.activeio.packet.Packet#sliceAsBytes()
+     */
+    public byte[] sliceAsBytes() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /**
+     * @param dest
+     * @return the number of bytes read into the dest.
+     */
+    public int read(Packet dest) {
+        
+	    int rc = Math.min(dest.remaining(), remaining()); 
+		if( rc > 0 ) {
+		    
+	        if( dest.getClass() == ByteBufferPacket.class ) {            
+
+			    // Adjust our limit so that we don't overflow the dest buffer. 
+				int limit = limit();
+				limit(position()+rc);
+				
+	            ((ByteBufferPacket)dest).buffer.put(buffer);
+
+	            // restore the limit.
+				limit(limit);
+	            
+	            return 0;
+	        } else {	            
+	            ByteSequence sequence = dest.asByteSequence();
+	            rc = read(sequence.getData(), sequence.getOffset(), sequence.getLength());
+	            dest.position(dest.position()+rc);
+	        }
+		}
+		return rc;
+    }
+	
+    public String toString() {
+        return "{position="+position()+",limit="+limit()+",capacity="+capacity()+"}";
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+    
+    public void dispose() {        
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacketPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacketPool.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacketPool.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacketPool.java Tue Feb 21 15:12:56 2006
@@ -1,46 +1,46 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.packet;
-
-
-import java.nio.ByteBuffer;
-
-/**
- * Provides a simple pool of ByteBuffer objects.
- * 
- * @version $Revision: 1.1 $
- */
-final public class ByteBufferPacketPool extends PacketPool {
-        
-	private final int packetSize;
-	
-	/**
-	 * Creates a pool of <code>bufferCount</code> ByteBuffers that are 
-	 * directly allocated being <code>bufferSize</code> big.
-	 * 
-	 * @param packetCount the number of buffers that will be in the pool.
-	 * @param packetSize the size of the buffers that are in the pool.
-	 */
-	public ByteBufferPacketPool(int packetCount,int packetSize) {
-		super(packetCount);
-		this.packetSize = packetSize;
-	}
-	
-    protected Packet allocateNewPacket() {
-        return new ByteBufferPacket(ByteBuffer.allocateDirect(packetSize));
-    }	
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet;
+
+
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a simple pool of ByteBuffer objects.
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class ByteBufferPacketPool extends PacketPool {
+        
+	private final int packetSize;
+	
+	/**
+	 * Creates a pool of <code>bufferCount</code> ByteBuffers that are 
+	 * directly allocated being <code>bufferSize</code> big.
+	 * 
+	 * @param packetCount the number of buffers that will be in the pool.
+	 * @param packetSize the size of the buffers that are in the pool.
+	 */
+	public ByteBufferPacketPool(int packetCount,int packetSize) {
+		super(packetCount);
+		this.packetSize = packetSize;
+	}
+	
+    protected Packet allocateNewPacket() {
+        return new ByteBufferPacket(ByteBuffer.allocateDirect(packetSize));
+    }	
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/packet/ByteBufferPacketPool.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message