directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r123184 - in incubator/directory/network/trunk/mina/src: examples/org/apache/mina/examples/echoserver java/org/apache/mina/io java/org/apache/mina/io/filter java/org/apache/mina/io/socket java/org/apache/mina/util
Date Thu, 23 Dec 2004 10:50:56 GMT
Author: trustin
Date: Thu Dec 23 02:50:54 2004
New Revision: 123184

URL: http://svn.apache.org/viewcvs?view=rev&rev=123184
Log:
 * Added ThreadPoolFilter for I/O layer
 * Added init() and destroy() lifecycle methods to IoHandlerFilter.
Added:
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java
  (contents, props changed)
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java 
 (contents, props changed)
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java   (contents,
props changed)
Modified:
   incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java
   incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java

Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java
(original)
+++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java
Thu Dec 23 02:50:54 2004
@@ -34,48 +34,56 @@
 {
     public void sessionOpened( IoSession IoSession )
     {
-        System.out.println( IoSession.getRemoteAddress() + ": OPEN" );
+        System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress()
+ ": OPEN" );
     }
 
     public void sessionClosed( IoSession IoSession )
     {
-        System.out.println( IoSession.getRemoteAddress() + ": CLOSED" );
+        System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress()
+ ": CLOSED" );
     }
 
     public void sessionIdle( IoSession IoSession, IdleStatus status )
     {
-        System.out.println( IoSession.getRemoteAddress() + ": IDLE" );
+        System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress()
+ ": IDLE" );
     }
 
     public void exceptionCaught( IoSession IoSession, Throwable cause )
     {
-        System.out.println( IoSession.getRemoteAddress() + ": EXCEPTION" );
+        System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress()
+ ": EXCEPTION" );
         cause.printStackTrace( System.out );
     }
 
     public void dataRead( IoSession IoSession )
     {
-        System.out.println( IoSession.getRemoteAddress() + ": READ" );
-
         ReadBuffer rb = IoSession.getReadBuffer();
         WriteBuffer wb = IoSession.getWriteBuffer();
 
-        if (wb.putAsPossible(rb) > 0) {
-            rb.signal();
-            wb.flush();
+        synchronized (rb) {
+            synchronized (wb) {
+                int nBytes = wb.putAsPossible(rb);
+                if (nBytes > 0) {
+                    System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress()
+ ": READ (" + nBytes + "B)");
+                    rb.signal();
+                    wb.flush();
+                }
+            }
         }
     }
 
     public void dataWritten( IoSession IoSession )
     {
-        System.out.println( IoSession.getRemoteAddress() + ": WRITTEN");
-
         ReadBuffer rb = IoSession.getReadBuffer();
         WriteBuffer wb = IoSession.getWriteBuffer();
 
-        if (wb.putAsPossible(rb) > 0) {
-            rb.signal();
-            wb.flush();
+        synchronized (rb) {
+            synchronized (wb) {
+                int nBytes = wb.putAsPossible(rb);
+                if (nBytes > 0) {
+                    System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress()
+ ": WRITTEN (" + nBytes + "B)");
+                    rb.signal();
+                    wb.flush();
+                }
+            }
         }
     }
 

Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java
(original)
+++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java
Thu Dec 23 02:50:54 2004
@@ -21,6 +21,7 @@
 import java.net.InetSocketAddress;
 
 import org.apache.mina.io.Acceptor;
+import org.apache.mina.io.filter.ThreadPoolFilter;
 import org.apache.mina.io.socket.TcpAcceptor;
 
 /**
@@ -38,6 +39,7 @@
         Acceptor acceptor = new TcpAcceptor();
         acceptor.bind( new InetSocketAddress( PORT ),
                        new EchoProtocolHandler() );
+        acceptor.addFilter(Integer.MAX_VALUE, new ThreadPoolFilter());
         System.out.println( "Listening on port " + PORT );
     }
 }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java
Thu Dec 23 02:50:54 2004
@@ -28,6 +28,10 @@
  */
 public interface IoHandlerFilter
 {
+    void init();
+
+    void destroy();
+
     void sessionOpened( IoHandler nextHandler, IoSession session );
 
     void sessionClosed( IoHandler nextHandler, IoSession session );

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java
Thu Dec 23 02:50:54 2004
@@ -13,6 +13,13 @@
  */
 public class IoHandlerFilterAdapter implements IoHandlerFilter
 {
+    public void init()
+    {
+    }
+
+    public void destroy()
+    {
+    }
 
     public void sessionOpened( IoHandler nextHandler, IoSession session )
     {

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java?view=auto&rev=123184
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java
Thu Dec 23 02:50:54 2004
@@ -0,0 +1,483 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.mina.io.filter;
+
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.io.IoHandler;
+import org.apache.mina.io.IoHandlerFilter;
+import org.apache.mina.io.IoSession;
+import org.apache.mina.io.ReadBuffer;
+import org.apache.mina.util.BlockingSet;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.Stack;
+
+/**
+ * TODO Document me.
+ * 
+ * A Leader/Followers thread pool.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ThreadPoolFilter implements IoHandlerFilter
+{
+    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+    private static volatile int threadId = 0;
+
+    private Map buffers = new IdentityHashMap();
+
+    private Stack followers = new Stack();
+
+    private Worker leader;
+
+    private BlockingSet readySessionBuffers = new BlockingSet();
+
+    private Set busySessionBuffers = new HashSet();
+
+    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+
+    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+    private boolean shuttingDown;
+
+    private int poolSize;
+
+    public ThreadPoolFilter()
+    {
+    }
+
+    public synchronized int getPoolSize()
+    {
+        return poolSize;
+    }
+
+    public int getMaximumPoolSize()
+    {
+        return maximumPoolSize;
+    }
+
+    public int getKeepAliveTime()
+    {
+        return keepAliveTime;
+    }
+
+    public void setMaximumPoolSize( int maximumPoolSize )
+    {
+        if( maximumPoolSize <= 0 )
+            throw new IllegalArgumentException();
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public void setKeepAliveTime( int keepAliveTime )
+    {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public void init()
+    {
+        shuttingDown = false;
+
+        leader = new Worker();
+        leader.start();
+        leader.lead();
+    }
+
+    public void destroy()
+    {
+        shuttingDown = true;
+        Worker lastLeader = null;
+        for( ;; )
+        {
+            Worker leader = this.leader;
+            if( lastLeader == leader )
+                break;
+
+            while( leader.isAlive() )
+            {
+                leader.interrupt();
+                try
+                {
+                    leader.join();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+
+            lastLeader = leader;
+        }
+    }
+
+    private synchronized void increasePoolSize()
+    {
+        poolSize++;
+    }
+
+    private synchronized void decreasePoolSize()
+    {
+        poolSize--;
+    }
+
+    public void sessionOpened( IoHandler nextHandler, IoSession session )
+    {
+        fireEvent( nextHandler, session, EventType.OPENED, null );
+    }
+
+    public void sessionClosed( IoHandler nextHandler, IoSession session )
+    {
+        fireEvent( nextHandler, session, EventType.CLOSED, null );
+    }
+
+    public void sessionIdle( IoHandler nextHandler, IoSession session,
+                            IdleStatus status )
+    {
+        fireEvent( nextHandler, session, EventType.IDLE, status );
+    }
+
+    public void exceptionCaught( IoHandler nextHandler, IoSession session,
+                                Throwable cause )
+    {
+        fireEvent( nextHandler, session, EventType.EXCEPTION, cause );
+    }
+
+    public void dataRead( IoHandler nextHandler, IoSession session )
+    {
+        fireEvent( nextHandler, session, EventType.READ, null );
+    }
+
+    public void dataWritten( IoHandler nextHandler, IoSession session )
+    {
+        fireEvent( nextHandler, session, EventType.WRITTEN, null );
+    }
+
+    public void markerReleased( IoHandler nextHandler, IoSession session,
+                               Object marker )
+    {
+        fireEvent( nextHandler, session, EventType.MARKER, marker );
+    }
+
+    private void fireEvent( IoHandler nextHandler, IoSession session,
+                           EventType type, Object data )
+    {
+        SessionBuffer buf = getSessionBuffer( session );
+        synchronized( buf )
+        {
+            buf.nextHandlers.push( nextHandler );
+            buf.eventTypes.push( type );
+            buf.eventDatum.push( data );
+        }
+
+        synchronized( readySessionBuffers )
+        {
+            if( !busySessionBuffers.contains( buf ) )
+            {
+                busySessionBuffers.add( buf );
+                readySessionBuffers.add( buf );
+            }
+        }
+    }
+
+    private SessionBuffer getSessionBuffer( IoSession session )
+    {
+        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+        if( buf == null )
+        {
+            synchronized( buffers )
+            {
+                buf = ( SessionBuffer ) buffers.get( session );
+                if( buf == null )
+                {
+                    buf = new SessionBuffer( session );
+                    buffers.put( session, buf );
+                }
+            }
+        }
+        return buf;
+    }
+
+    private void removeSessionBuffer( SessionBuffer buf )
+    {
+        synchronized( buffers )
+        {
+            buffers.remove( buf.session );
+        }
+    }
+
+    private static class SessionBuffer
+    {
+
+        private final IoSession session;
+
+        private final Queue nextHandlers = new Queue();
+
+        private final Queue eventTypes = new Queue();
+
+        private final Queue eventDatum = new Queue();
+
+        private SessionBuffer( IoSession session )
+        {
+            this.session = session;
+        }
+    }
+
+    private static class EventType
+    {
+        private static final EventType OPENED = new EventType();
+
+        private static final EventType CLOSED = new EventType();
+
+        private static final EventType READ = new EventType();
+
+        private static final EventType WRITTEN = new EventType();
+
+        private static final EventType IDLE = new EventType();
+
+        private static final EventType MARKER = new EventType();
+
+        private static final EventType EXCEPTION = new EventType();
+
+        private EventType()
+        {
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        private final Object promotionLock = new Object();
+
+        private Worker()
+        {
+            super( ThreadPoolFilter.class.getName() + '-' + ( threadId++ ) );
+            increasePoolSize();
+        }
+
+        public void lead()
+        {
+            synchronized( promotionLock )
+            {
+                leader = this;
+                promotionLock.notify();
+            }
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                if( !waitForPromotion() )
+                    break;
+
+                SessionBuffer buf = fetchBuffer();
+                giveUpLead();
+
+                if( buf == null )
+                    break;
+
+                processEvents( buf );
+                follow();
+                releaseBuffer( buf );
+            }
+
+            decreasePoolSize();
+        }
+
+        private SessionBuffer fetchBuffer()
+        {
+            SessionBuffer buf = null;
+            synchronized( readySessionBuffers )
+            {
+                do
+                {
+                    buf = null;
+                    try
+                    {
+                        readySessionBuffers.waitForNewItem();
+                    }
+                    catch( InterruptedException e )
+                    {
+                        break;
+                    }
+
+                    Iterator it = readySessionBuffers.iterator();
+                    if (!it.hasNext()) {
+                        // exceeded keepAliveTime
+                        break;
+                    }
+
+                    do
+                    {
+                        buf = null;
+                        buf = ( SessionBuffer ) it.next();
+                        it.remove();
+                    }
+                    while( buf != null && buf.nextHandlers.isEmpty()
+                           && it.hasNext() );
+                }
+                while( buf != null && buf.nextHandlers.isEmpty() );
+            }
+
+            return buf;
+        }
+
+        private void processEvents( SessionBuffer buf )
+        {
+            IoSession session = buf.session;
+            for( ;; )
+            {
+                IoHandler nextHandler;
+                EventType type;
+                Object data;
+                synchronized( buf )
+                {
+                    nextHandler = ( IoHandler ) buf.nextHandlers.pop();
+                    if( nextHandler == null )
+                        break;
+
+                    type = ( EventType ) buf.eventTypes.pop();
+                    data = buf.eventDatum.pop();
+                }
+                processEvent( nextHandler, session, type, data );
+            }
+        }
+
+        private void processEvent( IoHandler nextHandler, IoSession session,
+                                  EventType type, Object data )
+        {
+            if( type == EventType.READ )
+            {
+                ReadBuffer buf = session.getReadBuffer();
+                boolean fire;
+                synchronized( buf )
+                {
+                    fire = buf.hasRemaining();
+                }
+
+                if( fire )
+                {
+                    nextHandler.dataRead( session );
+                }
+            }
+            else if( type == EventType.WRITTEN )
+            {
+                nextHandler.dataWritten( session );
+            }
+            else if( type == EventType.MARKER )
+            {
+                nextHandler.markerReleased( session, data );
+            }
+            else if( type == EventType.EXCEPTION )
+            {
+                nextHandler.exceptionCaught( session, ( Throwable ) data );
+            }
+            else if( type == EventType.IDLE )
+            {
+                nextHandler.sessionIdle( session, ( IdleStatus ) data );
+            }
+            else if( type == EventType.OPENED )
+            {
+                nextHandler.sessionOpened( session );
+            }
+            else if( type == EventType.CLOSED )
+            {
+                nextHandler.sessionClosed( session );
+            }
+        }
+
+        private void follow()
+        {
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    synchronized( followers )
+                    {
+                        followers.push( this );
+                    }
+                }
+            }
+        }
+
+        private void releaseBuffer( SessionBuffer buf )
+        {
+            synchronized( readySessionBuffers )
+            {
+                busySessionBuffers.remove( buf );
+                if( buf.nextHandlers.isEmpty() )
+                {
+                    removeSessionBuffer( buf );
+                }
+                else
+                {
+                    readySessionBuffers.add( buf );
+                }
+            }
+        }
+
+        private boolean waitForPromotion()
+        {
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    try
+                    {
+                        int keepAliveTime = getKeepAliveTime();
+                        if( keepAliveTime > 0 )
+                        {
+                            promotionLock.wait( keepAliveTime );
+                        }
+                        else
+                        {
+                            promotionLock.wait();
+                        }
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+                }
+
+                return this == leader;
+            }
+        }
+
+        private void giveUpLead()
+        {
+            Worker worker;
+            synchronized( followers )
+            {
+                worker = ( Worker ) followers.pop();
+            }
+
+            if( worker != null )
+            {
+                worker.lead();
+            }
+            else
+            {
+                if( !shuttingDown )
+                {
+                    synchronized( ThreadPoolFilter.this )
+                    {
+                        if( !shuttingDown
+                            && getPoolSize() < getMaximumPoolSize() )
+                        {
+                            worker = new Worker();
+                            worker.start();
+                            worker.lead();
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
Thu Dec 23 02:50:54 2004
@@ -103,6 +103,7 @@
 
             if( worker == null )
             {
+                filterManager.start();
                 worker = new Worker();
                 worker.start();
             }
@@ -170,6 +171,7 @@
                             if( selector.keys().isEmpty() )
                             {
                                 worker = null;
+                                filterManager.stop();
                                 break;
                             }
                         }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
Thu Dec 23 02:50:54 2004
@@ -100,6 +100,7 @@
 
                 if( worker == null )
                 {
+                    filterManager.start();
                     worker = new Worker();
                     worker.start();
                 }
@@ -228,6 +229,7 @@
                         {
                             if( selector.keys().isEmpty() )
                             {
+                                filterManager.stop();
                                 worker = null;
                                 break;
                             }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
Thu Dec 23 02:50:54 2004
@@ -117,16 +117,11 @@
 
     public void addReadableSession( TcpSession session )
     {
-        SelectionKey key = session.getSelectionKey();
-
-        if( ( key.interestOps() & SelectionKey.OP_READ ) == 0 )
+        synchronized( readableSessions )
         {
-            synchronized( readableSessions )
-            {
-                readableSessions.push( session );
-            }
-            selector.wakeup();
+            readableSessions.push( session );
         }
+        selector.wakeup();
     }
 
     private void addSessions()
@@ -190,7 +185,9 @@
                 break;
 
             SelectionKey key = session.getSelectionKey();
-            key.interestOps( key.interestOps() | SelectionKey.OP_READ );
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0) {
+                key.interestOps( key.interestOps() | SelectionKey.OP_READ );
+            }
         }
     }
 

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java
Thu Dec 23 02:50:54 2004
@@ -51,6 +51,10 @@
     private final TcpWriteBuffer writeBuf;
 
     private final IoHandler handler;
+    
+    private final SocketAddress remoteAddress;
+    
+    private final SocketAddress localAddress;
 
     private SelectionKey key;
 
@@ -86,6 +90,8 @@
                                                                                0 ) );
         this.writeBuf = new TcpWriteBuffer( this, ByteBufferPool.open() );
         this.handler = defaultHandler;
+        this.remoteAddress = ch.socket().getRemoteSocketAddress();
+        this.localAddress = ch.socket().getLocalSocketAddress();
     }
     
     IoHandlerFilterManager getFilterManager() {
@@ -165,12 +171,12 @@
 
     public SocketAddress getRemoteAddress()
     {
-        return ch.socket().getRemoteSocketAddress();
+        return remoteAddress;
     }
 
     public SocketAddress getLocalAddress()
     {
-        return ch.socket().getLocalSocketAddress();
+        return localAddress;
     }
 
     public long getReadBytes()

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java?view=auto&rev=123184
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java
Thu Dec 23 02:50:54 2004
@@ -0,0 +1,52 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.mina.util;
+
+import java.util.HashSet;
+import java.util.Iterator;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class BlockingSet extends HashSet
+{
+    private int waiters = 0;
+
+    public synchronized boolean add( Object o )
+    {
+        boolean ret = super.add( o );
+        if( ret && waiters > 0 )
+            notify();
+        return ret;
+    }
+
+    public Iterator iterator()
+    {
+        return super.iterator();
+    }
+
+    public synchronized boolean remove( Object o )
+    {
+        return super.remove( o );
+    }
+
+    public synchronized void waitForNewItem() throws InterruptedException
+    {
+        waiters++;
+        try
+        {
+            while( isEmpty() )
+            {
+                wait();
+            }
+        }
+        finally
+        {
+            waiters--;
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r2=123184
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java
Thu Dec 23 02:50:54 2004
@@ -56,16 +56,59 @@
         {
             session.getHandler().markerReleased( session, marker );
         }
+
+        public void init()
+        {
+        }
+
+        public void destroy()
+        {
+        }
     };
 
     private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER );
 
+    private boolean started;
+
     public IoHandlerFilterManager()
     {
     }
 
+    public synchronized void start()
+    {
+        if( started )
+            return;
+
+        Entry e = head;
+        do
+        {
+            e.filter.init();
+            e = e.nextEntry;
+        }
+        while( e != null );
+        started = true;
+    }
+
+    public synchronized void stop()
+    {
+        if( !started )
+            return;
+
+        Entry e = head;
+        do
+        {
+            e.filter.destroy();
+            e = e.nextEntry;
+        }
+        while( e != null );
+    }
+
     public synchronized void addFilter( int priority, IoHandlerFilter filter )
     {
+        if (started) {
+            filter.init();
+        }
+
         Entry e = head;
         Entry prevEntry = null;
         for( ;; )
@@ -121,6 +164,10 @@
                 else
                 {
                     prevEntry.nextEntry = e.nextEntry;
+                }
+                
+                if (started) {
+                    filter.destroy();
                 }
                 break;
             }

Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java?view=auto&rev=123184
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java	Thu Dec
23 02:50:54 2004
@@ -0,0 +1,137 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.io.Serializable;
+
+import java.util.Arrays;
+
+/**
+ * <p>
+ * A simple queue class. This class is <b>NOT </b> thread-safe.
+ * </p>
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class Stack implements Serializable
+{
+    private Object[] items;
+    
+    private int size = 0;
+
+    /**
+     * Construct a new, empty <code>Queue</code> with the specified initial
+     * capacity.
+     */
+    public Stack()
+    {
+        items = new Object[ 16 ];
+    }
+
+    /**
+     * Clears this queue.
+     */
+    public void clear()
+    {
+        Arrays.fill( items, null );
+        size = 0;
+    }
+
+    /**
+     * Dequeues from this queue.
+     * 
+     * @return <code>null</code>, if this queue is empty or the element is
+     *         really <code>null</code>.
+     */
+    public Object pop()
+    {
+        if( size == 0 )
+        {
+            return null;
+        }
+
+        int pos = size - 1;
+        Object ret = items[ pos ];
+        items[ pos ] = null;
+        size--;
+
+        return ret;
+    }
+
+    /**
+     * Enqueue into this queue.
+     */
+    public void push( Object obj )
+    {
+        if( size == items.length )
+        {
+            // expand queue
+            final int oldLen = items.length;
+            Object[] tmp = new Object[ oldLen * 2 ];
+            System.arraycopy( items, 0, tmp, 0, size);
+            items = tmp;
+        }
+
+        items[ size ] = obj;
+        size++;
+    }
+
+    /**
+     * Returns the first element of the queue.
+     * 
+     * @return <code>null</code>, if the queue is empty, or the element is
+     *         really <code>null</code>.
+     */
+    public Object first()
+    {
+        if( size == 0 )
+        {
+            return null;
+        }
+
+        return items[ size - 1 ];
+    }
+
+    public Object last()
+    {
+        if( size == 0 )
+        {
+            return null;
+        }
+
+        return items[ 0 ];
+    }
+
+    /**
+     * Returns <code>true</code> if the queue is empty.
+     */
+    public boolean isEmpty()
+    {
+        return ( size == 0 );
+    }
+
+    /**
+     * Returns the number of elements in the queue.
+     */
+    public int size()
+    {
+        return size;
+    }
+}
\ No newline at end of file

Mime
View raw message