directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r123284 - in incubator/directory/network/trunk/mina/src: examples/org/apache/mina/examples/echoserver java/org/apache/mina/io java/org/apache/mina/io/datagram java/org/apache/mina/io/filter java/org/apache/mina/io/socket java/org/apache/mina/protocol java/org/apache/mina/protocol/filter java/org/apache/mina/util test/org/apache/mina/examples/echoserver
Date Fri, 24 Dec 2004 08:33:06 GMT
Author: trustin
Date: Fri Dec 24 00:33:05 2004
New Revision: 123284

URL: http://svn.apache.org/viewcvs?view=rev&rev=123284
Log:
 * Fixed: Deadlock when stopping thread pool filters
 * Added: DatagramChannel support
 * Removed: Filter lifecycle methods to make filters reusable between different acceptors and connectors
 * Added: TestCase for echo server example
Added:
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java   (contents, props changed)
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java   (contents, props changed)
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java   (contents, props changed)
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java   (contents, props changed)
Modified:
   incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java
   incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java

Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java	(original)
+++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java	Fri Dec 24 00:33:05 2004
@@ -21,6 +21,7 @@
 import java.net.InetSocketAddress;
 
 import org.apache.mina.io.Acceptor;
+import org.apache.mina.io.datagram.DatagramAcceptor;
 import org.apache.mina.io.filter.IoThreadPoolFilter;
 import org.apache.mina.io.socket.SocketAcceptor;
 
@@ -37,9 +38,19 @@
     public static void main( String[] args ) throws Exception
     {
         Acceptor acceptor = new SocketAcceptor();
-        acceptor.addFilter( Integer.MAX_VALUE, new IoThreadPoolFilter() );
+
+        IoThreadPoolFilter threadPoolFilter = new IoThreadPoolFilter();
+        threadPoolFilter.start();
+        acceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter );
+
         acceptor.bind( new InetSocketAddress( PORT ),
                        new EchoProtocolHandler() );
+
+        Acceptor datagramAcceptor = new DatagramAcceptor();
+        datagramAcceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter );
+        datagramAcceptor.bind( new InetSocketAddress( PORT ),
+                               new EchoProtocolHandler() );
+
         System.out.println( "Listening on port " + PORT );
     }
 }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java	Fri Dec 24 00:33:05 2004
@@ -30,10 +30,6 @@
  */
 public interface IoHandlerFilter
 {
-    void init();
-
-    void destroy();
-
     void sessionOpened( IoHandler nextHandler, IoSession session );
 
     void sessionClosed( IoHandler nextHandler, IoSession session );

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java	Fri Dec 24 00:33:05 2004
@@ -30,14 +30,6 @@
  */
 public class IoHandlerFilterAdapter implements IoHandlerFilter
 {
-    public void init()
-    {
-    }
-
-    public void destroy()
-    {
-    }
-
     public void sessionOpened( IoHandler nextHandler, IoSession session )
     {
         nextHandler.sessionOpened( session );

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java?view=auto&rev=123284
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java	Fri Dec 24 00:33:05 2004
@@ -0,0 +1,426 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.io.datagram;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.io.Acceptor;
+import org.apache.mina.io.ExceptionMonitor;
+import org.apache.mina.io.IoHandler;
+import org.apache.mina.io.IoHandlerFilter;
+import org.apache.mina.util.DefaultExceptionMonitor;
+import org.apache.mina.util.IoHandlerFilterManager;
+import org.apache.mina.util.Queue;
+
+/**
+ * TODO Insert type comment.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramAcceptor implements Acceptor, DatagramProcessor
+{
+    private static volatile int nextId = 0;
+
+    private final IoHandlerFilterManager filterManager = new IoHandlerFilterManager();
+
+    private final int id = nextId++;
+
+    private final Selector selector;
+
+    private final Map channels = new HashMap();
+
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+
+    private final Queue flushingSessions = new Queue();
+
+    private ExceptionMonitor exceptionMonitor = new DefaultExceptionMonitor();
+
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     * 
+     * @throws IOException
+     */
+    public DatagramAcceptor() throws IOException
+    {
+        selector = Selector.open();
+    }
+
+    public void bind( SocketAddress address, IoHandler defaultHandler )
+            throws IOException
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( defaultHandler == null )
+            throw new NullPointerException( "defaultHandler" );
+
+        if( ! ( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+
+        DatagramChannel ch = DatagramChannel.open();
+        ch.configureBlocking( false );
+        ch.socket().bind( address );
+
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( new RegistrationRequest( ch,
+                                                             defaultHandler ) );
+            }
+            channels.put( address, ch );
+
+            if( worker == null )
+            {
+                worker = new Worker();
+                worker.start();
+            }
+        }
+
+        selector.wakeup();
+    }
+
+    public void unbind( SocketAddress address )
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+
+        DatagramChannel ch;
+
+        synchronized( this )
+        {
+            ch = ( DatagramChannel ) channels.get( address );
+
+            if( ch == null )
+                throw new IllegalArgumentException( "Unknown address: "
+                                                    + address );
+
+            SelectionKey key = ch.keyFor( selector );
+            channels.remove( address );
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( key );
+            }
+        }
+
+        selector.wakeup();
+        ch.socket().close();
+    }
+
+    public void flushSession( DatagramSession session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    private void scheduleFlush( DatagramSession session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "DatagramAcceptor-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( DatagramAcceptor.this )
+                        {
+                            if( selector.keys().isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+
+                    cancelKeys();
+
+                    if( nKeys > 0 )
+                    {
+                        processReadySessions( selector.selectedKeys() );
+                    }
+                    flushSessions();
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor
+                            .exceptionCaught( DatagramAcceptor.this, e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            it.remove();
+
+            DatagramChannel ch = ( DatagramChannel ) key.channel();
+
+            DatagramSession session = new DatagramSession(
+                                                           DatagramAcceptor.this,
+                                                           filterManager,
+                                                           ch,
+                                                           key, 
+                                                           ( IoHandler ) key
+                                                                   .attachment() );
+
+            if( key.isReadable() )
+            {
+                readSession( session );
+            }
+
+            if( key.isWritable() )
+            {
+                scheduleFlush( session );
+            }
+        }
+    }
+
+    private void readSession( DatagramSession session )
+    {
+
+        ByteBuffer readBuf = ByteBuffer.allocate( 1500 );
+        try
+        {
+            SocketAddress remoteAddress = session.getChannel()
+                    .receive( readBuf );
+            if( remoteAddress != null )
+            {
+                readBuf.flip();
+                session.setRemoteAddress( remoteAddress );
+                filterManager.fireDataRead( session, readBuf );
+            }
+        }
+        catch( IOException e )
+        {
+            filterManager.fireExceptionCaught( session, e );
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            DatagramSession session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( DatagramSession ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                session.getFilterManager().fireExceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( DatagramSession session ) throws IOException
+    {
+        DatagramChannel ch = session.getChannel();
+
+        Queue writeBufferQueue = session.getWriteBufferQueue();
+        Queue writeMarkerQueue = session.getWriteMarkerQueue();
+
+        ByteBuffer buf;
+        Object marker;
+        for( ;; )
+        {
+            synchronized( writeBufferQueue )
+            {
+                buf = ( ByteBuffer ) writeBufferQueue.first();
+                marker = writeMarkerQueue.first();
+            }
+
+            if( buf == null )
+                break;
+
+            if( buf.remaining() == 0 )
+            {
+                // pop and fire event
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                    writeMarkerQueue.pop();
+                }
+                session.getFilterManager().fireDataWritten( session, marker );
+                continue;
+            }
+
+            int writtenBytes = ch.send( buf, session.getRemoteAddress() );
+
+            SelectionKey key = session.getSelectionKey();
+            if( writtenBytes == 0 )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+            }
+            else
+            {
+                key
+                        .interestOps( key.interestOps()
+                                      & ( ~SelectionKey.OP_WRITE ) );
+
+                // pop and fire event
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                    writeMarkerQueue.pop();
+                }
+                session.getFilterManager().fireDataWritten( session, marker );
+            }
+        }
+    }
+
+    private void registerNew() throws ClosedChannelException
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            req.channel.register( selector, SelectionKey.OP_READ, req.handler );
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SelectionKey key;
+            synchronized( cancelQueue )
+            {
+                key = ( SelectionKey ) cancelQueue.pop();
+            }
+
+            if( key == null )
+                break;
+            else
+            {
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+            }
+        }
+    }
+
+    public void addFilter( int priority, IoHandlerFilter filter )
+    {
+        filterManager.addFilter( priority, filter );
+    }
+
+    public void removeFilter( IoHandlerFilter filter )
+    {
+        filterManager.removeFilter( filter );
+    }
+
+    private static class RegistrationRequest
+    {
+        private final DatagramChannel channel;
+
+        private final IoHandler handler;
+
+        private RegistrationRequest( DatagramChannel channel, IoHandler handler )
+        {
+            this.channel = channel;
+            this.handler = handler;
+        }
+    }
+
+    public ExceptionMonitor getExceptionMonitor()
+    {
+        return exceptionMonitor;
+    }
+
+    public void setExceptionMonitor( ExceptionMonitor monitor )
+    {
+        if( monitor == null )
+        {
+            monitor = new DefaultExceptionMonitor();
+        }
+
+        this.exceptionMonitor = monitor;
+    }
+}
\ No newline at end of file

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java?view=auto&rev=123284
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java	Fri Dec 24 00:33:05 2004
@@ -0,0 +1,30 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.io.datagram;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface DatagramProcessor
+{
+    void flushSession( DatagramSession session );
+}
\ No newline at end of file

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java?view=auto&rev=123284
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java	Fri Dec 24 00:33:05 2004
@@ -0,0 +1,253 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.io.datagram;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.SessionConfig;
+import org.apache.mina.io.IoHandler;
+import org.apache.mina.io.IoSession;
+import org.apache.mina.util.IoHandlerFilterManager;
+import org.apache.mina.util.Queue;
+
+/**
+ * TODO Insert type comment.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+class DatagramSession implements IoSession
+{
+    private final DatagramProcessor parent;
+
+    private final IoHandlerFilterManager filterManager;
+
+    private final DatagramChannel ch;
+
+    private final DatagramSessionConfig config;
+
+    private final Queue writeBufferQueue;
+
+    private final Queue writeMarkerQueue;
+
+    private final IoHandler handler;
+
+    private final SocketAddress localAddress;
+
+    private final SelectionKey key;
+
+    private SocketAddress remoteAddress;
+
+    private Object attachment;
+
+    private long readBytes;
+
+    private long writtenBytes;
+
+    private long lastReadTime;
+
+    private long lastWriteTime;
+
+    private boolean idleForBoth;
+
+    private boolean idleForRead;
+
+    private boolean idleForWrite;
+
+    /**
+     * Creates a new instance.
+     */
+    DatagramSession( DatagramProcessor parent, IoHandlerFilterManager filterManager, DatagramChannel ch,
+                  SelectionKey key, IoHandler defaultHandler )
+    {
+        this.parent = parent;
+        this.filterManager = filterManager;
+        this.ch = ch;
+        this.config = new DatagramSessionConfig( ch );
+        this.writeBufferQueue = new Queue();
+        this.writeMarkerQueue = new Queue();
+        this.handler = defaultHandler;
+        this.remoteAddress = ch.socket().getRemoteSocketAddress();
+        this.localAddress = ch.socket().getLocalSocketAddress();
+        this.key = key;
+    }
+
+    IoHandlerFilterManager getFilterManager()
+    {
+        return filterManager;
+    }
+
+    DatagramChannel getChannel()
+    {
+        return ch;
+    }
+
+    SelectionKey getSelectionKey()
+    {
+        return key;
+    }
+
+    public IoHandler getHandler()
+    {
+        return handler;
+    }
+
+    public void close()
+    {
+    }
+
+    public Object getAttachment()
+    {
+        return attachment;
+    }
+
+    public void setAttachment( Object attachment )
+    {
+        this.attachment = attachment;
+    }
+
+    Queue getWriteBufferQueue()
+    {
+        return writeBufferQueue;
+    }
+
+    Queue getWriteMarkerQueue()
+    {
+        return writeMarkerQueue;
+    }
+
+    public void write( byte[] buf, int offset, int length, Object marker )
+    {
+        write( ByteBuffer.wrap( buf, offset, length ), marker );
+    }
+
+    public void write( byte[] buf, Object marker )
+    {
+        write( ByteBuffer.wrap( buf ), marker );
+    }
+
+    public void write( ByteBuffer buf, Object marker )
+    {
+        synchronized( writeBufferQueue )
+        {
+            writeBufferQueue.push( buf );
+            writeMarkerQueue.push( marker );
+        }
+
+        parent.flushSession( this );
+    }
+
+    public boolean isConnected()
+    {
+        return ch.isConnected();
+    }
+
+    public boolean isClosed()
+    {
+        return !isConnected();
+    }
+
+    public SessionConfig getConfig()
+    {
+        return config;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return remoteAddress;
+    }
+    
+    void setRemoteAddress(SocketAddress remoteAddress)
+    {
+        this.remoteAddress = remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return localAddress;
+    }
+
+    public long getReadBytes()
+    {
+        return readBytes;
+    }
+
+    public long getWrittenBytes()
+    {
+        return writtenBytes;
+    }
+
+    void increaseReadBytes( int increment )
+    {
+        readBytes += increment;
+        lastReadTime = System.currentTimeMillis();
+    }
+
+    void increaseWrittenBytes( int increment )
+    {
+        writtenBytes += increment;
+        lastWriteTime = System.currentTimeMillis();
+    }
+
+    public long getLastIoTime()
+    {
+        return Math.max( lastReadTime, lastWriteTime );
+    }
+
+    public long getLastReadTime()
+    {
+        return lastReadTime;
+    }
+
+    public long getLastWriteTime()
+    {
+        return lastWriteTime;
+    }
+
+    public boolean isIdle( IdleStatus status )
+    {
+        if( status == IdleStatus.BOTH_IDLE )
+            return idleForBoth;
+
+        if( status == IdleStatus.READER_IDLE )
+            return idleForRead;
+
+        if( status == IdleStatus.WRITER_IDLE )
+            return idleForWrite;
+
+        throw new IllegalArgumentException( "Unknown idle status: " + status );
+    }
+
+    void setIdle( IdleStatus status )
+    {
+        if( status == IdleStatus.BOTH_IDLE )
+            idleForBoth = true;
+        else if( status == IdleStatus.READER_IDLE )
+            idleForRead = true;
+        else if( status == IdleStatus.WRITER_IDLE )
+            idleForWrite = true;
+        else
+            throw new IllegalArgumentException( "Unknown idle status: "
+                                                + status );
+    }
+}
\ No newline at end of file

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java?view=auto&rev=123284
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSessionConfig.java	Fri Dec 24 00:33:05 2004
@@ -0,0 +1,60 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.io.datagram;
+
+import java.net.SocketException;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.util.BasicSessionConfig;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class DatagramSessionConfig extends BasicSessionConfig
+{
+    private final DatagramChannel ch;
+
+    DatagramSessionConfig( DatagramChannel ch )
+    {
+        this.ch = ch;
+    }
+
+    public boolean getReuseAddress() throws SocketException
+    {
+        return ch.socket().getReuseAddress();
+    }
+
+    public void setReuseAddress( boolean on ) throws SocketException
+    {
+        ch.socket().setReuseAddress( on );
+    }
+
+    public int getTrafficClass() throws SocketException
+    {
+        return ch.socket().getTrafficClass();
+    }
+
+    public void setTrafficClass( int tc ) throws SocketException
+    {
+        ch.socket().setTrafficClass( tc );
+    }
+}
\ No newline at end of file

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java	Fri Dec 24 00:33:05 2004
@@ -63,10 +63,14 @@
 
     private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
 
+    private boolean started;
+
     private boolean shuttingDown;
 
     private int poolSize;
 
+    private final Object poolSizeLock = new Object();
+
     public IoThreadPoolFilter()
     {
     }
@@ -98,17 +102,25 @@
         this.keepAliveTime = keepAliveTime;
     }
 
-    public void init()
+    public synchronized void start()
     {
+        if( started )
+            return;
+
         shuttingDown = false;
 
         leader = new Worker();
         leader.start();
         leader.lead();
+
+        started = true;
     }
 
-    public void destroy()
+    public synchronized void stop()
     {
+        if( !started )
+            return;
+
         shuttingDown = true;
         Worker lastLeader = null;
         for( ;; )
@@ -131,16 +143,24 @@
 
             lastLeader = leader;
         }
+
+        started = false;
     }
 
-    private synchronized void increasePoolSize()
+    private void increasePoolSize()
     {
-        poolSize++;
+        synchronized( poolSizeLock )
+        {
+            poolSize++;
+        }
     }
 
-    private synchronized void decreasePoolSize()
+    private void decreasePoolSize()
     {
-        poolSize--;
+        synchronized( poolSizeLock )
+        {
+            poolSize--;
+        }
     }
 
     public void sessionOpened( IoHandler nextHandler, IoSession session )

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketAcceptor.java	Fri Dec 24 00:33:05 2004
@@ -107,7 +107,6 @@
 
             if( worker == null )
             {
-                filterManager.start();
                 worker = new Worker();
                 worker.start();
             }
@@ -175,7 +174,6 @@
                             if( selector.keys().isEmpty() )
                             {
                                 worker = null;
-                                filterManager.stop();
                                 break;
                             }
                         }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/SocketConnector.java	Fri Dec 24 00:33:05 2004
@@ -104,7 +104,6 @@
 
                 if( worker == null )
                 {
-                    filterManager.start();
                     worker = new Worker();
                     worker.start();
                 }
@@ -233,7 +232,6 @@
                         {
                             if( selector.keys().isEmpty() )
                             {
-                                filterManager.stop();
                                 worker = null;
                                 break;
                             }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/IoAdapter.java	Fri Dec 24 00:33:05 2004
@@ -40,12 +40,6 @@
 
     public IoAdapter()
     {
-        filterManager.start();
-    }
-
-    public void shutdown()
-    {
-        filterManager.stop();
     }
 
     public void addFilter( int priority, ProtocolHandlerFilter filter )
@@ -82,33 +76,29 @@
 
         public void sessionOpened( IoSession session )
         {
-            ProtocolSession psession = new ProtocolSessionImpl( session, this );
-            session.setAttachment( psession );
-            filterManager.fireSessionOpened( psession );
+            filterManager.fireSessionOpened( getProtocolSession( session ) );
         }
 
         public void sessionClosed( IoSession session )
         {
-            filterManager.fireSessionClosed( ( ProtocolSession ) session
-                    .getAttachment() );
+            filterManager.fireSessionClosed( getProtocolSession( session ) );
         }
 
         public void sessionIdle( IoSession session, IdleStatus status )
         {
-            filterManager.fireSessionIdle( ( ProtocolSession ) session
-                    .getAttachment(), status );
+            filterManager.fireSessionIdle( getProtocolSession( session ),
+                                           status );
         }
 
         public void exceptionCaught( IoSession session, Throwable cause )
         {
-            filterManager.fireExceptionCaught( ( ProtocolSession ) session
-                    .getAttachment(), cause );
+            filterManager.fireExceptionCaught( getProtocolSession( session ),
+                                               cause );
         }
 
         public void dataRead( IoSession session, ByteBuffer in )
         {
-            ProtocolSession psession = ( ProtocolSession ) session
-                    .getAttachment();
+            ProtocolSession psession = getProtocolSession( session );
             Object result;
             try
             {
@@ -176,6 +166,26 @@
             {
                 filterManager.fireExceptionCaught( psession, t );
             }
+        }
+
+        private ProtocolSession getProtocolSession( IoSession session )
+        {
+            ProtocolSession psession = ( ProtocolSession ) session
+                    .getAttachment();
+            if( psession == null )
+            {
+                synchronized( session )
+                {
+                    psession = ( ProtocolSession ) session.getAttachment();
+                    if( psession == null )
+                    {
+                        psession = new ProtocolSessionImpl( session, this );
+                        session.setAttachment( psession );
+                    }
+                }
+            }
+
+            return psession;
         }
     }
 

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilter.java	Fri Dec 24 00:33:05 2004
@@ -28,10 +28,6 @@
  */
 public interface ProtocolHandlerFilter
 {
-    void init();
-
-    void destroy();
-
     void sessionOpened( ProtocolHandler nextHandler, ProtocolSession session );
 
     void sessionClosed( ProtocolHandler nextHandler, ProtocolSession session );

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/ProtocolHandlerFilterAdapter.java	Fri Dec 24 00:33:05 2004
@@ -28,14 +28,6 @@
  */
 public class ProtocolHandlerFilterAdapter implements ProtocolHandlerFilter
 {
-    public void init()
-    {
-    }
-
-    public void destroy()
-    {
-    }
-
     public void sessionOpened( ProtocolHandler nextHandler,
                               ProtocolSession session )
     {

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java	Fri Dec 24 00:33:05 2004
@@ -62,17 +62,24 @@
 
     private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
 
+    private boolean started;
+
     private boolean shuttingDown;
 
     private int poolSize;
 
+    private final Object poolSizeLock = new Object();
+
     public ProtocolThreadPoolFilter()
     {
     }
 
-    public synchronized int getPoolSize()
+    public int getPoolSize()
     {
-        return poolSize;
+        synchronized( poolSizeLock )
+        {
+            return poolSize;
+        }
     }
 
     public int getMaximumPoolSize()
@@ -97,17 +104,25 @@
         this.keepAliveTime = keepAliveTime;
     }
 
-    public void init()
+    public synchronized void start()
     {
+        if( started )
+            return;
+
         shuttingDown = false;
 
         leader = new Worker();
         leader.start();
         leader.lead();
+
+        started = true;
     }
 
-    public void destroy()
+    public synchronized void stop()
     {
+        if( !started )
+            return;
+
         shuttingDown = true;
         Worker lastLeader = null;
         for( ;; )
@@ -130,16 +145,24 @@
 
             lastLeader = leader;
         }
+
+        started = false;
     }
 
-    private synchronized void increasePoolSize()
+    private void increasePoolSize()
     {
-        poolSize++;
+        synchronized( poolSizeLock )
+        {
+            poolSize++;
+        }
     }
 
-    private synchronized void decreasePoolSize()
+    private void decreasePoolSize()
     {
-        poolSize--;
+        synchronized( poolSizeLock )
+        {
+            poolSize--;
+        }
     }
 
     public void sessionOpened( ProtocolHandler nextHandler,

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java	Fri Dec 24 00:33:05 2004
@@ -69,60 +69,16 @@
         {
             session.getHandler().dataWritten( session, marker );
         }
-
-        public void init()
-        {
-        }
-
-        public void destroy()
-        {
-        }
     };
 
     private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER );
 
-    private boolean started;
-
     public IoHandlerFilterManager()
     {
     }
 
-    public synchronized void start()
-    {
-        if( started )
-            return;
-
-        Entry e = head;
-        do
-        {
-            e.filter.init();
-            e = e.nextEntry;
-        }
-        while( e != null );
-        started = true;
-    }
-
-    public synchronized void stop()
-    {
-        if( !started )
-            return;
-
-        Entry e = head;
-        do
-        {
-            e.filter.destroy();
-            e = e.nextEntry;
-        }
-        while( e != null );
-    }
-
     public synchronized void addFilter( int priority, IoHandlerFilter filter )
     {
-        if( started )
-        {
-            filter.init();
-        }
-
         Entry e = head;
         Entry prevEntry = null;
         for( ;; )
@@ -178,11 +134,6 @@
                 else
                 {
                     prevEntry.nextEntry = e.nextEntry;
-                }
-
-                if( started )
-                {
-                    filter.destroy();
                 }
                 break;
             }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java	(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/ProtocolHandlerFilterManager.java	Fri Dec 24 00:33:05 2004
@@ -69,61 +69,17 @@
         {
             session.getHandler().messageSent( session, message );
         }
-
-        public void init()
-        {
-        }
-
-        public void destroy()
-        {
-        }
     };
 
     private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER );
 
-    private boolean started;
-
     public ProtocolHandlerFilterManager()
     {
     }
 
-    public synchronized void start()
-    {
-        if( started )
-            return;
-
-        Entry e = head;
-        do
-        {
-            e.filter.init();
-            e = e.nextEntry;
-        }
-        while( e != null );
-        started = true;
-    }
-
-    public synchronized void stop()
-    {
-        if( !started )
-            return;
-
-        Entry e = head;
-        do
-        {
-            e.filter.destroy();
-            e = e.nextEntry;
-        }
-        while( e != null );
-    }
-
     public synchronized void addFilter( int priority,
                                        ProtocolHandlerFilter filter )
     {
-        if( started )
-        {
-            filter.init();
-        }
-
         Entry e = head;
         Entry prevEntry = null;
         for( ;; )
@@ -179,11 +135,6 @@
                 else
                 {
                     prevEntry.nextEntry = e.nextEntry;
-                }
-
-                if( started )
-                {
-                    filter.destroy();
                 }
                 break;
             }

Modified: incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java?view=diff&rev=123284&p1=incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java&r1=123283&p2=incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java&r2=123284
==============================================================================
--- incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java	(original)
+++ incubator/directory/network/trunk/mina/src/test/org/apache/mina/examples/echoserver/Test.java	Fri Dec 24 00:33:05 2004
@@ -3,14 +3,16 @@
  */
 package org.apache.mina.examples.echoserver;
 
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.Socket;
+import java.net.SocketTimeoutException;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.net.EchoTCPClient;
+import org.apache.commons.net.EchoUDPClient;
 import org.apache.mina.io.Acceptor;
+import org.apache.mina.io.datagram.DatagramAcceptor;
 import org.apache.mina.io.filter.IoThreadPoolFilter;
 import org.apache.mina.io.socket.SocketAcceptor;
 
@@ -26,39 +28,143 @@
 
     private Acceptor acceptor;
 
+    private Acceptor datagramAcceptor;
+
+    private IoThreadPoolFilter threadPoolFilter;
+
+    public static void assertEquals( byte[] expected, byte[] actual )
+    {
+        assertEquals( toString( expected ), toString( actual ) );
+    }
+
+    private static String toString( byte[] buf )
+    {
+        StringBuffer str = new StringBuffer( buf.length * 4 );
+        for( int i = 0; i < buf.length; i++ )
+        {
+            str.append( buf[ i ] );
+            str.append( ' ' );
+        }
+        return str.toString();
+    }
+
     protected void setUp() throws Exception
     {
+        threadPoolFilter = new IoThreadPoolFilter();
+        threadPoolFilter.start();
+
         acceptor = new SocketAcceptor();
         acceptor.bind( new InetSocketAddress( PORT ),
                        new EchoProtocolHandler() );
-        acceptor.addFilter( Integer.MAX_VALUE, new IoThreadPoolFilter() );
+        acceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter );
+
+        datagramAcceptor = new DatagramAcceptor();
+        datagramAcceptor.addFilter( Integer.MAX_VALUE, threadPoolFilter );
+        datagramAcceptor.bind( new InetSocketAddress( PORT ),
+                               new EchoProtocolHandler() );
     }
 
     protected void tearDown() throws Exception
     {
         acceptor.unbind( new InetSocketAddress( PORT ) );
+        datagramAcceptor.unbind( new InetSocketAddress( PORT ) );
+        threadPoolFilter.stop();
     }
 
-    public void testPool() throws Exception
+    public void testTCP() throws Exception
     {
-        Socket s = new Socket( "localhost", PORT );
-        InputStream in = s.getInputStream();
-        OutputStream out = s.getOutputStream();
-        try
+        EchoTCPClient client = new EchoTCPClient();
+        client.connect( InetAddress.getLocalHost(), PORT );
+        client.setSoTimeout( 3000 );
+
+        byte[] writeBuf = new byte[ 16 ];
+
+        for( int i = 0; i < 10; i++ )
         {
-            for( int i = 0; i < 1024; i++ )
+            fillWriteBuffer( writeBuf, i );
+            client.getOutputStream().write( writeBuf );
+        }
+
+        byte[] readBuf = new byte[ writeBuf.length ];
+
+        for( int i = 0; i < 10; i++ )
+        {
+            fillWriteBuffer( writeBuf, i );
+
+            int readBytes = 0;
+            while( readBytes < readBuf.length )
             {
-                int b = ( ( byte ) i ) & 0xff;
-                System.out.println( "Test: " + b );
-                out.write( b );
-                assertEquals( b, in.read() );
+                int nBytes = client.getInputStream()
+                        .read( readBuf, readBytes, readBuf.length - readBytes );
+
+                if( nBytes < 0 )
+                    fail( "Unexpected disconnection." );
+
+                readBytes += nBytes;
             }
+
+            assertEquals( writeBuf, readBuf );
+        }
+
+        client.setSoTimeout( 500 );
+
+        try
+        {
+            client.getInputStream().read();
+            fail( "Unexpected incoming data." );
+        }
+        catch( SocketTimeoutException e )
+        {
+        }
+
+        client.disconnect();
+    }
+
+    public void testUDP() throws Exception
+    {
+        EchoUDPClient client = new EchoUDPClient();
+        client.open();
+        client.setSoTimeout( 3000 );
+
+        byte[] writeBuf = new byte[ 16 ];
+
+        for( int i = 0; i < 10; i++ )
+        {
+            fillWriteBuffer( writeBuf, i );
+            client.send( writeBuf, writeBuf.length,
+                         InetAddress.getLocalHost(), PORT );
         }
-        finally
+
+        byte[] readBuf = new byte[ writeBuf.length ];
+
+        for( int i = 0; i < 10; i++ )
+        {
+            fillWriteBuffer( writeBuf, i );
+
+            assertEquals( readBuf.length, client.receive( readBuf,
+                                                          readBuf.length ) );
+            assertEquals( writeBuf, readBuf );
+        }
+
+        client.setSoTimeout( 500 );
+
+        try
+        {
+            client.receive( readBuf );
+            fail( "Unexpected incoming data." );
+        }
+        catch( SocketTimeoutException e )
+        {
+        }
+
+        client.close();
+    }
+
+    private void fillWriteBuffer( byte[] writeBuf, int i )
+    {
+        for( int j = writeBuf.length - 1; j >= 0; j-- )
         {
-            in.close();
-            out.close();
-            s.close();
+            writeBuf[ j ] = ( byte ) ( j + i );
         }
     }
 

Mime
View raw message