activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r359675 - in /incubator/activemq/trunk/activeio/activeio-aio/src: main/java/org/apache/activeio/packet/async/aio/ test/java/org/apache/activeio/packet/async/aio/
Date Thu, 29 Dec 2005 02:10:22 GMT
Author: chirino
Date: Wed Dec 28 18:10:15 2005
New Revision: 359675

URL: http://svn.apache.org/viewcvs?rev=359675&view=rev
Log:
added missing files and updated copyright header.

Added:
    incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/
    incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
    incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
    incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
Modified:
    incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
    incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java

Added: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java?rev=359675&view=auto
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
(added)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
Wed Dec 28 18:10:15 2005
@@ -0,0 +1,262 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.stream.sync.socket.SocketMetadata;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import com.ibm.io.async.AsyncSocketChannel;
+import com.ibm.io.async.IAbstractAsyncFuture;
+import com.ibm.io.async.IAsyncFuture;
+import com.ibm.io.async.ICompletionListener;
+
+/**
+ * @version $Revision$
+ */
+final public class AIOAsyncChannel implements AsyncChannel, ICompletionListener, SocketMetadata
{
+
+    protected static final int DEFAULT_BUFFER_SIZE = ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE;
+
+    private final AsyncSocketChannel socketChannel;
+    private final Socket socket;
+
+    private AsyncChannelListener channelListener;
+    private ByteBuffer inputByteBuffer;
+    
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private CountDownLatch doneCountDownLatch;
+
+    protected AIOAsyncChannel(AsyncSocketChannel socketChannel) throws IOException {
+        this.socketChannel = socketChannel;
+        this.socket = socketChannel.socket();
+        this.socket.setSendBufferSize(DEFAULT_BUFFER_SIZE);
+        this.socket.setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
+        this.socket.setSoTimeout(0);
+    }
+    
+    private ByteBuffer allocateBuffer() {
+        return ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
+    }   
+
+    public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+        this.channelListener = channelListener;
+    }
+
+    public AsyncChannelListener getAsyncChannelListener() {
+        return channelListener;
+    }
+
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+
+    public void dispose() {
+        if( running.get() && channelListener!=null ) {
+            channelListener.onPacketError(new SocketException("Socket closed."));
+        }
+        try {
+            stop();
+        } catch (IOException e) {
+        }
+        try {
+            socketChannel.close();
+        } catch (IOException e) {
+        }
+    }
+
+    public void start() throws IOException {
+        if( running.compareAndSet(false, true) ) {
+            doneCountDownLatch = new CountDownLatch(1);
+            requestNextRead();
+        }
+    }
+
+    public void stop() throws IOException {
+        if( running.compareAndSet(true, false) ) {
+            try {
+                doneCountDownLatch.await(5,  TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }
+    }
+
+    public void write(Packet packet) throws IOException {
+        ByteBuffer data = ((ByteBufferPacket)packet).getByteBuffer();
+        while( data.hasRemaining() ) {
+	        IAsyncFuture future = socketChannel.write(data);
+	        try {
+	            future.getByteCount();
+	        } catch (InterruptedException e) {
+	            throw new InterruptedIOException();
+	        }
+        }
+    }
+
+    public void flush() throws IOException {
+    }
+
+    public void futureCompleted(IAbstractAsyncFuture abstractFuture, Object attribute) {
+        IAsyncFuture future = (IAsyncFuture)abstractFuture;
+        try {
+            
+            if( inputByteBuffer.position()>0 ) {
+	            ByteBuffer remaining = inputByteBuffer.slice();            
+	            Packet data = new ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
+	            
+	            channelListener.onPacket(data);	            
+	            // Keep the remaining buffer around to fill with data.
+	            inputByteBuffer = remaining;
+	            requestNextRead();
+	            
+            } else {                
+                channelListener.onPacket(EOSPacket.EOS_PACKET);  
+            }
+            
+        } catch (IOException e) {
+            channelListener.onPacketError(e);
+        }
+    }
+
+    private void requestNextRead() throws InterruptedIOException {
+        
+        // Don't do next read if we have stopped running.
+        if( !running.get() ) {
+            doneCountDownLatch.countDown();
+            return;
+        }
+        
+        try {
+            
+            if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
+                inputByteBuffer = allocateBuffer();
+            }
+
+            IAsyncFuture future = socketChannel.read(inputByteBuffer);
+            future.addCompletionListener(this, null, false);
+            
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+
+    }
+
+    public InetAddress getInetAddress() {
+        return socket.getInetAddress();
+    }
+    public boolean getKeepAlive() throws SocketException {
+        return socket.getKeepAlive();
+    }
+    public InetAddress getLocalAddress() {
+        return socket.getLocalAddress();
+    }
+    public int getLocalPort() {
+        return socket.getLocalPort();
+    }
+    public SocketAddress getLocalSocketAddress() {
+        return socket.getLocalSocketAddress();
+    }
+    public boolean getOOBInline() throws SocketException {
+        return socket.getOOBInline();
+    }
+    public int getPort() {
+        return socket.getPort();
+    }
+    public int getReceiveBufferSize() throws SocketException {
+        return socket.getReceiveBufferSize();
+    }
+    public SocketAddress getRemoteSocketAddress() {
+        return socket.getRemoteSocketAddress();
+    }
+    public boolean getReuseAddress() throws SocketException {
+        return socket.getReuseAddress();
+    }
+    public int getSendBufferSize() throws SocketException {
+        return socket.getSendBufferSize();
+    }
+    public int getSoLinger() throws SocketException {
+        return socket.getSoLinger();
+    }
+    public int getSoTimeout() throws SocketException {
+        return socket.getSoTimeout();
+    }
+    public boolean getTcpNoDelay() throws SocketException {
+        return socket.getTcpNoDelay();
+    }
+    public int getTrafficClass() throws SocketException {
+        return socket.getTrafficClass();
+    }
+    public boolean isBound() {
+        return socket.isBound();
+    }
+    public boolean isClosed() {
+        return socket.isClosed();
+    }
+    public boolean isConnected() {
+        return socket.isConnected();
+    }
+    public void setKeepAlive(boolean on) throws SocketException {
+        socket.setKeepAlive(on);
+    }
+    public void setOOBInline(boolean on) throws SocketException {
+        socket.setOOBInline(on);
+    }
+    public void setReceiveBufferSize(int size) throws SocketException {
+        socket.setReceiveBufferSize(size);
+    }
+    public void setReuseAddress(boolean on) throws SocketException {
+        socket.setReuseAddress(on);
+    }
+    public void setSendBufferSize(int size) throws SocketException {
+        socket.setSendBufferSize(size);
+    }
+    public void setSoLinger(boolean on, int linger) throws SocketException {
+        socket.setSoLinger(on, linger);
+    }
+    public void setTcpNoDelay(boolean on) throws SocketException {
+        socket.setTcpNoDelay(on);
+    }
+    public void setTrafficClass(int tc) throws SocketException {
+        socket.setTrafficClass(tc);
+    }
+    public void setSoTimeout(int i) throws SocketException {
+        socket.setSoTimeout(i);
+    }
+    public String toString() {
+        return "AIO Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress();
+    }
+
+ }
\ No newline at end of file

Added: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java?rev=359675&view=auto
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
(added)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
Wed Dec 28 18:10:15 2005
@@ -0,0 +1,115 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activeio.adapter.SyncToAsyncChannelServer;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.util.URISupport;
+
+import com.ibm.io.async.AsyncServerSocketChannel;
+import com.ibm.io.async.AsyncSocketChannel;
+
+/**
+ * A TcpAsyncChannelFactory creates {@see org.apache.activeio.net.TcpAsyncChannel}
+ * and {@see org.apache.activeio.net.TcpAsyncChannelServer} objects.
+ * 
+ * @version $Revision$
+ */
+public class AIOAsyncChannelFactory implements AsyncChannelFactory {
+
+    protected static final int DEFAULT_BACKLOG = 500;
+    private int backlog = DEFAULT_BACKLOG;
+        
+    /**
+     * Uses the {@param location}'s host and port to create a tcp connection to a remote
host.
+     * 
+     * @see org.apache.activeio.AsyncChannelFactory#openAsyncChannel(java.net.URI)
+     */
+    public AsyncChannel openAsyncChannel(URI location) throws IOException {
+        AsyncSocketChannel channel = AsyncSocketChannel.open();
+        channel.connect(new InetSocketAddress(location.getHost(), location.getPort()));
+        return createAsyncChannel(channel);
+    }
+
+    /**
+     * @param channel
+     * @return
+     * @throws IOException
+     */
+    protected AsyncChannel createAsyncChannel(AsyncSocketChannel socketChannel) throws IOException
{
+        AsyncChannel channel = new AIOAsyncChannel(socketChannel);
+        channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(true),
false);
+        return channel;
+    }
+
+    /**
+     * Binds a server socket a the {@param location}'s port. 
+     * 
+     * @see org.apache.activeio.AsyncChannelFactory#bindAsyncChannel(java.net.URI)
+     */
+    public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException {
+        
+        String host = bindURI.getHost();
+        InetSocketAddress address;
+        if( host == null || host.length() == 0 || host.equals("localhost") || host.equals("0.0.0.0")
|| InetAddress.getLocalHost().getHostName().equals(host) ) {            
+            address = new InetSocketAddress(bindURI.getPort());
+        } else {
+            address = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
+        }
+        
+        AsyncServerSocketChannel serverSocketChannel = AsyncServerSocketChannel.open();
+        serverSocketChannel.socket().bind(address,backlog);
+        
+        URI connectURI = bindURI;
+        try {
+//            connectURI = URISupport.changeHost(connectURI, InetAddress.getLocalHost().getHostName());
+            connectURI = URISupport.changePort(connectURI, serverSocketChannel.socket().getLocalPort());
+        } catch (URISyntaxException e) {
+            throw (IOException)new IOException("Could not build connect URI: "+e).initCause(e);
+        }
+        
+        return SyncToAsyncChannelServer.adapt( 
+                new AIOSyncChannelServer(serverSocketChannel, bindURI, connectURI));
+    }
+    
+    /**
+     * @return Returns the backlog.
+     */
+    public int getBacklog() {
+        return backlog;
+    }
+
+    /**
+     * @param backlog
+     *            The backlog to set.
+     */
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
+    }
+
+
+}

Added: incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java?rev=359675&view=auto
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
(added)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
Wed Dec 28 18:10:15 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+import com.ibm.io.async.AsyncServerSocketChannel;
+
+/**
+ * @version $Revision$
+ */
+public class AIOSyncChannelServer implements SyncChannelServer {
+
+    private final AsyncServerSocketChannel serverSocket;
+    private final URI bindURI;
+    private final URI connectURI;
+    private int curentSoTimeout;
+
+    public AIOSyncChannelServer(AsyncServerSocketChannel serverSocket, URI bindURI, URI connectURI)
throws IOException {
+        this.serverSocket=serverSocket;
+        this.bindURI=bindURI;
+        this.connectURI=connectURI;
+        this.curentSoTimeout = serverSocket.socket().getSoTimeout();
+    }
+    
+    public URI getBindURI() {
+        return bindURI;
+    }
+
+    public URI getConnectURI() {
+        return this.connectURI;
+    }
+
+    public void dispose() {
+        try {
+            serverSocket.close();
+        } catch (IOException e) {
+        }
+    }
+
+    synchronized public void start() throws IOException {
+    }
+
+    synchronized public void stop() {
+    }
+
+    public Channel accept(long timeout) throws IOException {
+        try {
+	        
+            if (timeout == SyncChannelServer.WAIT_FOREVER_TIMEOUT)
+	            setSoTimeout(0);
+	        else if (timeout == SyncChannelServer.NO_WAIT_TIMEOUT)
+	            setSoTimeout(1);
+	        else
+	            setSoTimeout((int) timeout);
+	
+            AsyncChannel channel = new AIOAsyncChannel(serverSocket.accept());
+            channel = new WriteBufferedAsyncChannel(channel, ByteBufferPacket.createDefaultBuffer(true),
false);
+            return channel;
+	        
+        } catch (SocketTimeoutException ignore) {
+        }
+        return null;	        
+    }
+
+    private void setSoTimeout(int i) throws SocketException {
+        if (curentSoTimeout != i) {
+            serverSocket.socket().setSoTimeout(i);
+            curentSoTimeout = i;
+        }
+    }
+    
+    public Object getAdapter(Class target) {
+        if( target.isAssignableFrom(getClass()) ) {
+            return this;
+        }
+        return null;
+    }
+    
+    public String toString() {
+        return "AIO Server: "+getConnectURI();
+    }
+    
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java?rev=359675&r1=359674&r2=359675&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
(original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
Wed Dec 28 18:10:15 2005
@@ -1,18 +1,18 @@
 /**
  *
- * Copyright 2004 Hiram Chirino
+ * Copyright 2005-2006 The Apache Software Foundation
  *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.activeio.packet.async.aio;
 

Modified: incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java?rev=359675&r1=359674&r2=359675&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
(original)
+++ incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
Wed Dec 28 18:10:15 2005
@@ -1,20 +1,19 @@
-/** 
- * 
- * Copyright 2004 Hiram Chirino
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); 
- * you may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at 
- * 
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, 
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
- * See the License for the specific language governing permissions and 
- * limitations under the License. 
- * 
- **/
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activeio.packet.async.aio;
 
 import java.io.IOException;



Mime
View raw message