Return-Path: Delivered-To: apmail-incubator-directory-cvs-archive@www.apache.org Received: (qmail 89090 invoked from network); 23 Dec 2004 10:51:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur-2.apache.org with SMTP; 23 Dec 2004 10:51:04 -0000 Received: (qmail 4188 invoked by uid 500); 23 Dec 2004 10:51:04 -0000 Delivered-To: apmail-incubator-directory-cvs-archive@incubator.apache.org Received: (qmail 4127 invoked by uid 500); 23 Dec 2004 10:51:03 -0000 Mailing-List: contact directory-cvs-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: directory-dev@incubator.apache.org Delivered-To: mailing list directory-cvs@incubator.apache.org Received: (qmail 4095 invoked by uid 99); 23 Dec 2004 10:51:02 -0000 X-ASF-Spam-Status: No, hits=-9.8 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from minotaur.apache.org (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.28) with SMTP; Thu, 23 Dec 2004 02:50:59 -0800 Received: (qmail 89006 invoked by uid 65534); 23 Dec 2004 10:50:56 -0000 Date: 23 Dec 2004 10:50:56 -0000 Message-ID: <20041223105056.89001.qmail@minotaur.apache.org> From: trustin@apache.org To: directory-cvs@incubator.apache.org Subject: svn commit: r123184 - in incubator/directory/network/trunk/mina/src: examples/org/apache/mina/examples/echoserver java/org/apache/mina/io java/org/apache/mina/io/filter java/org/apache/mina/io/socket java/org/apache/mina/util MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 X-Virus-Checked: Checked X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N Author: trustin Date: Thu Dec 23 02:50:54 2004 New Revision: 123184 URL: http://svn.apache.org/viewcvs?view=rev&rev=123184 Log: * Added ThreadPoolFilter for I/O layer * Added init() and destroy() lifecycle methods to IoHandlerFilter. Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java (contents, props changed) incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java (contents, props changed) incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java (contents, props changed) Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/EchoProtocolHandler.java Thu Dec 23 02:50:54 2004 @@ -34,48 +34,56 @@ { public void sessionOpened( IoSession IoSession ) { - System.out.println( IoSession.getRemoteAddress() + ": OPEN" ); + System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress() + ": OPEN" ); } public void sessionClosed( IoSession IoSession ) { - System.out.println( IoSession.getRemoteAddress() + ": CLOSED" ); + System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress() + ": CLOSED" ); } public void sessionIdle( IoSession IoSession, IdleStatus status ) { - System.out.println( IoSession.getRemoteAddress() + ": IDLE" ); + System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress() + ": IDLE" ); } public void exceptionCaught( IoSession IoSession, Throwable cause ) { - System.out.println( IoSession.getRemoteAddress() + ": EXCEPTION" ); + System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress() + ": EXCEPTION" ); cause.printStackTrace( System.out ); } public void dataRead( IoSession IoSession ) { - System.out.println( IoSession.getRemoteAddress() + ": READ" ); - ReadBuffer rb = IoSession.getReadBuffer(); WriteBuffer wb = IoSession.getWriteBuffer(); - if (wb.putAsPossible(rb) > 0) { - rb.signal(); - wb.flush(); + synchronized (rb) { + synchronized (wb) { + int nBytes = wb.putAsPossible(rb); + if (nBytes > 0) { + System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress() + ": READ (" + nBytes + "B)"); + rb.signal(); + wb.flush(); + } + } } } public void dataWritten( IoSession IoSession ) { - System.out.println( IoSession.getRemoteAddress() + ": WRITTEN"); - ReadBuffer rb = IoSession.getReadBuffer(); WriteBuffer wb = IoSession.getWriteBuffer(); - if (wb.putAsPossible(rb) > 0) { - rb.signal(); - wb.flush(); + synchronized (rb) { + synchronized (wb) { + int nBytes = wb.putAsPossible(rb); + if (nBytes > 0) { + System.out.println( Thread.currentThread().getName() + ' ' + IoSession.getRemoteAddress() + ": WRITTEN (" + nBytes + "B)"); + rb.signal(); + wb.flush(); + } + } } } Modified: incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java (original) +++ incubator/directory/network/trunk/mina/src/examples/org/apache/mina/examples/echoserver/Main.java Thu Dec 23 02:50:54 2004 @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import org.apache.mina.io.Acceptor; +import org.apache.mina.io.filter.ThreadPoolFilter; import org.apache.mina.io.socket.TcpAcceptor; /** @@ -38,6 +39,7 @@ Acceptor acceptor = new TcpAcceptor(); acceptor.bind( new InetSocketAddress( PORT ), new EchoProtocolHandler() ); + acceptor.addFilter(Integer.MAX_VALUE, new ThreadPoolFilter()); System.out.println( "Listening on port " + PORT ); } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilter.java Thu Dec 23 02:50:54 2004 @@ -28,6 +28,10 @@ */ public interface IoHandlerFilter { + void init(); + + void destroy(); + void sessionOpened( IoHandler nextHandler, IoSession session ); void sessionClosed( IoHandler nextHandler, IoSession session ); Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/IoHandlerFilterAdapter.java Thu Dec 23 02:50:54 2004 @@ -13,6 +13,13 @@ */ public class IoHandlerFilterAdapter implements IoHandlerFilter { + public void init() + { + } + + public void destroy() + { + } public void sessionOpened( IoHandler nextHandler, IoSession session ) { Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java?view=auto&rev=123184 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/filter/ThreadPoolFilter.java Thu Dec 23 02:50:54 2004 @@ -0,0 +1,483 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.io.filter; + +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.mina.common.IdleStatus; +import org.apache.mina.io.IoHandler; +import org.apache.mina.io.IoHandlerFilter; +import org.apache.mina.io.IoSession; +import org.apache.mina.io.ReadBuffer; +import org.apache.mina.util.BlockingSet; +import org.apache.mina.util.Queue; +import org.apache.mina.util.Stack; + +/** + * TODO Document me. + * + * A Leader/Followers thread pool. + * + * @author Trustin Lee (trustin@apache.org) + * @version $Rev$, $Date$ + */ +public class ThreadPoolFilter implements IoHandlerFilter +{ + public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE; + + public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000; + + private static volatile int threadId = 0; + + private Map buffers = new IdentityHashMap(); + + private Stack followers = new Stack(); + + private Worker leader; + + private BlockingSet readySessionBuffers = new BlockingSet(); + + private Set busySessionBuffers = new HashSet(); + + private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; + + private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; + + private boolean shuttingDown; + + private int poolSize; + + public ThreadPoolFilter() + { + } + + public synchronized int getPoolSize() + { + return poolSize; + } + + public int getMaximumPoolSize() + { + return maximumPoolSize; + } + + public int getKeepAliveTime() + { + return keepAliveTime; + } + + public void setMaximumPoolSize( int maximumPoolSize ) + { + if( maximumPoolSize <= 0 ) + throw new IllegalArgumentException(); + this.maximumPoolSize = maximumPoolSize; + } + + public void setKeepAliveTime( int keepAliveTime ) + { + this.keepAliveTime = keepAliveTime; + } + + public void init() + { + shuttingDown = false; + + leader = new Worker(); + leader.start(); + leader.lead(); + } + + public void destroy() + { + shuttingDown = true; + Worker lastLeader = null; + for( ;; ) + { + Worker leader = this.leader; + if( lastLeader == leader ) + break; + + while( leader.isAlive() ) + { + leader.interrupt(); + try + { + leader.join(); + } + catch( InterruptedException e ) + { + } + } + + lastLeader = leader; + } + } + + private synchronized void increasePoolSize() + { + poolSize++; + } + + private synchronized void decreasePoolSize() + { + poolSize--; + } + + public void sessionOpened( IoHandler nextHandler, IoSession session ) + { + fireEvent( nextHandler, session, EventType.OPENED, null ); + } + + public void sessionClosed( IoHandler nextHandler, IoSession session ) + { + fireEvent( nextHandler, session, EventType.CLOSED, null ); + } + + public void sessionIdle( IoHandler nextHandler, IoSession session, + IdleStatus status ) + { + fireEvent( nextHandler, session, EventType.IDLE, status ); + } + + public void exceptionCaught( IoHandler nextHandler, IoSession session, + Throwable cause ) + { + fireEvent( nextHandler, session, EventType.EXCEPTION, cause ); + } + + public void dataRead( IoHandler nextHandler, IoSession session ) + { + fireEvent( nextHandler, session, EventType.READ, null ); + } + + public void dataWritten( IoHandler nextHandler, IoSession session ) + { + fireEvent( nextHandler, session, EventType.WRITTEN, null ); + } + + public void markerReleased( IoHandler nextHandler, IoSession session, + Object marker ) + { + fireEvent( nextHandler, session, EventType.MARKER, marker ); + } + + private void fireEvent( IoHandler nextHandler, IoSession session, + EventType type, Object data ) + { + SessionBuffer buf = getSessionBuffer( session ); + synchronized( buf ) + { + buf.nextHandlers.push( nextHandler ); + buf.eventTypes.push( type ); + buf.eventDatum.push( data ); + } + + synchronized( readySessionBuffers ) + { + if( !busySessionBuffers.contains( buf ) ) + { + busySessionBuffers.add( buf ); + readySessionBuffers.add( buf ); + } + } + } + + private SessionBuffer getSessionBuffer( IoSession session ) + { + SessionBuffer buf = ( SessionBuffer ) buffers.get( session ); + if( buf == null ) + { + synchronized( buffers ) + { + buf = ( SessionBuffer ) buffers.get( session ); + if( buf == null ) + { + buf = new SessionBuffer( session ); + buffers.put( session, buf ); + } + } + } + return buf; + } + + private void removeSessionBuffer( SessionBuffer buf ) + { + synchronized( buffers ) + { + buffers.remove( buf.session ); + } + } + + private static class SessionBuffer + { + + private final IoSession session; + + private final Queue nextHandlers = new Queue(); + + private final Queue eventTypes = new Queue(); + + private final Queue eventDatum = new Queue(); + + private SessionBuffer( IoSession session ) + { + this.session = session; + } + } + + private static class EventType + { + private static final EventType OPENED = new EventType(); + + private static final EventType CLOSED = new EventType(); + + private static final EventType READ = new EventType(); + + private static final EventType WRITTEN = new EventType(); + + private static final EventType IDLE = new EventType(); + + private static final EventType MARKER = new EventType(); + + private static final EventType EXCEPTION = new EventType(); + + private EventType() + { + } + } + + private class Worker extends Thread + { + private final Object promotionLock = new Object(); + + private Worker() + { + super( ThreadPoolFilter.class.getName() + '-' + ( threadId++ ) ); + increasePoolSize(); + } + + public void lead() + { + synchronized( promotionLock ) + { + leader = this; + promotionLock.notify(); + } + } + + public void run() + { + for( ;; ) + { + if( !waitForPromotion() ) + break; + + SessionBuffer buf = fetchBuffer(); + giveUpLead(); + + if( buf == null ) + break; + + processEvents( buf ); + follow(); + releaseBuffer( buf ); + } + + decreasePoolSize(); + } + + private SessionBuffer fetchBuffer() + { + SessionBuffer buf = null; + synchronized( readySessionBuffers ) + { + do + { + buf = null; + try + { + readySessionBuffers.waitForNewItem(); + } + catch( InterruptedException e ) + { + break; + } + + Iterator it = readySessionBuffers.iterator(); + if (!it.hasNext()) { + // exceeded keepAliveTime + break; + } + + do + { + buf = null; + buf = ( SessionBuffer ) it.next(); + it.remove(); + } + while( buf != null && buf.nextHandlers.isEmpty() + && it.hasNext() ); + } + while( buf != null && buf.nextHandlers.isEmpty() ); + } + + return buf; + } + + private void processEvents( SessionBuffer buf ) + { + IoSession session = buf.session; + for( ;; ) + { + IoHandler nextHandler; + EventType type; + Object data; + synchronized( buf ) + { + nextHandler = ( IoHandler ) buf.nextHandlers.pop(); + if( nextHandler == null ) + break; + + type = ( EventType ) buf.eventTypes.pop(); + data = buf.eventDatum.pop(); + } + processEvent( nextHandler, session, type, data ); + } + } + + private void processEvent( IoHandler nextHandler, IoSession session, + EventType type, Object data ) + { + if( type == EventType.READ ) + { + ReadBuffer buf = session.getReadBuffer(); + boolean fire; + synchronized( buf ) + { + fire = buf.hasRemaining(); + } + + if( fire ) + { + nextHandler.dataRead( session ); + } + } + else if( type == EventType.WRITTEN ) + { + nextHandler.dataWritten( session ); + } + else if( type == EventType.MARKER ) + { + nextHandler.markerReleased( session, data ); + } + else if( type == EventType.EXCEPTION ) + { + nextHandler.exceptionCaught( session, ( Throwable ) data ); + } + else if( type == EventType.IDLE ) + { + nextHandler.sessionIdle( session, ( IdleStatus ) data ); + } + else if( type == EventType.OPENED ) + { + nextHandler.sessionOpened( session ); + } + else if( type == EventType.CLOSED ) + { + nextHandler.sessionClosed( session ); + } + } + + private void follow() + { + synchronized( promotionLock ) + { + if( this != leader ) + { + synchronized( followers ) + { + followers.push( this ); + } + } + } + } + + private void releaseBuffer( SessionBuffer buf ) + { + synchronized( readySessionBuffers ) + { + busySessionBuffers.remove( buf ); + if( buf.nextHandlers.isEmpty() ) + { + removeSessionBuffer( buf ); + } + else + { + readySessionBuffers.add( buf ); + } + } + } + + private boolean waitForPromotion() + { + synchronized( promotionLock ) + { + if( this != leader ) + { + try + { + int keepAliveTime = getKeepAliveTime(); + if( keepAliveTime > 0 ) + { + promotionLock.wait( keepAliveTime ); + } + else + { + promotionLock.wait(); + } + } + catch( InterruptedException e ) + { + } + } + + return this == leader; + } + } + + private void giveUpLead() + { + Worker worker; + synchronized( followers ) + { + worker = ( Worker ) followers.pop(); + } + + if( worker != null ) + { + worker.lead(); + } + else + { + if( !shuttingDown ) + { + synchronized( ThreadPoolFilter.this ) + { + if( !shuttingDown + && getPoolSize() < getMaximumPoolSize() ) + { + worker = new Worker(); + worker.start(); + worker.lead(); + } + } + } + } + } + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java Thu Dec 23 02:50:54 2004 @@ -103,6 +103,7 @@ if( worker == null ) { + filterManager.start(); worker = new Worker(); worker.start(); } @@ -170,6 +171,7 @@ if( selector.keys().isEmpty() ) { worker = null; + filterManager.stop(); break; } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java Thu Dec 23 02:50:54 2004 @@ -100,6 +100,7 @@ if( worker == null ) { + filterManager.start(); worker = new Worker(); worker.start(); } @@ -228,6 +229,7 @@ { if( selector.keys().isEmpty() ) { + filterManager.stop(); worker = null; break; } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Thu Dec 23 02:50:54 2004 @@ -117,16 +117,11 @@ public void addReadableSession( TcpSession session ) { - SelectionKey key = session.getSelectionKey(); - - if( ( key.interestOps() & SelectionKey.OP_READ ) == 0 ) + synchronized( readableSessions ) { - synchronized( readableSessions ) - { - readableSessions.push( session ); - } - selector.wakeup(); + readableSessions.push( session ); } + selector.wakeup(); } private void addSessions() @@ -190,7 +185,9 @@ break; SelectionKey key = session.getSelectionKey(); - key.interestOps( key.interestOps() | SelectionKey.OP_READ ); + if ((key.interestOps() & SelectionKey.OP_READ) == 0) { + key.interestOps( key.interestOps() | SelectionKey.OP_READ ); + } } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpSession.java Thu Dec 23 02:50:54 2004 @@ -51,6 +51,10 @@ private final TcpWriteBuffer writeBuf; private final IoHandler handler; + + private final SocketAddress remoteAddress; + + private final SocketAddress localAddress; private SelectionKey key; @@ -86,6 +90,8 @@ 0 ) ); this.writeBuf = new TcpWriteBuffer( this, ByteBufferPool.open() ); this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); } IoHandlerFilterManager getFilterManager() { @@ -165,12 +171,12 @@ public SocketAddress getRemoteAddress() { - return ch.socket().getRemoteSocketAddress(); + return remoteAddress; } public SocketAddress getLocalAddress() { - return ch.socket().getLocalSocketAddress(); + return localAddress; } public long getReadBytes() Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java?view=auto&rev=123184 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/BlockingSet.java Thu Dec 23 02:50:54 2004 @@ -0,0 +1,52 @@ +/* + * @(#) $Id$ + */ +package org.apache.mina.util; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * TODO Document me. + * + * @author Trustin Lee (trustin@apache.org) + * @version $Rev$, $Date$ + */ +public class BlockingSet extends HashSet +{ + private int waiters = 0; + + public synchronized boolean add( Object o ) + { + boolean ret = super.add( o ); + if( ret && waiters > 0 ) + notify(); + return ret; + } + + public Iterator iterator() + { + return super.iterator(); + } + + public synchronized boolean remove( Object o ) + { + return super.remove( o ); + } + + public synchronized void waitForNewItem() throws InterruptedException + { + waiters++; + try + { + while( isEmpty() ) + { + wait(); + } + } + finally + { + waiters--; + } + } +} \ No newline at end of file Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java?view=diff&rev=123184&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r1=123183&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java&r2=123184 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/IoHandlerFilterManager.java Thu Dec 23 02:50:54 2004 @@ -56,16 +56,59 @@ { session.getHandler().markerReleased( session, marker ); } + + public void init() + { + } + + public void destroy() + { + } }; private Entry head = new Entry( null, Integer.MIN_VALUE, FINAL_FILTER ); + private boolean started; + public IoHandlerFilterManager() { } + public synchronized void start() + { + if( started ) + return; + + Entry e = head; + do + { + e.filter.init(); + e = e.nextEntry; + } + while( e != null ); + started = true; + } + + public synchronized void stop() + { + if( !started ) + return; + + Entry e = head; + do + { + e.filter.destroy(); + e = e.nextEntry; + } + while( e != null ); + } + public synchronized void addFilter( int priority, IoHandlerFilter filter ) { + if (started) { + filter.init(); + } + Entry e = head; Entry prevEntry = null; for( ;; ) @@ -121,6 +164,10 @@ else { prevEntry.nextEntry = e.nextEntry; + } + + if (started) { + filter.destroy(); } break; } Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java?view=auto&rev=123184 ============================================================================== --- (empty file) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/util/Stack.java Thu Dec 23 02:50:54 2004 @@ -0,0 +1,137 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.util; + +import java.io.Serializable; + +import java.util.Arrays; + +/** + *

+ * A simple queue class. This class is NOT thread-safe. + *

+ * + * @author Trustin Lee (trustin@apache.org) + * @version $Rev$, $Date$ + */ +public class Stack implements Serializable +{ + private Object[] items; + + private int size = 0; + + /** + * Construct a new, empty Queue with the specified initial + * capacity. + */ + public Stack() + { + items = new Object[ 16 ]; + } + + /** + * Clears this queue. + */ + public void clear() + { + Arrays.fill( items, null ); + size = 0; + } + + /** + * Dequeues from this queue. + * + * @return null, if this queue is empty or the element is + * really null. + */ + public Object pop() + { + if( size == 0 ) + { + return null; + } + + int pos = size - 1; + Object ret = items[ pos ]; + items[ pos ] = null; + size--; + + return ret; + } + + /** + * Enqueue into this queue. + */ + public void push( Object obj ) + { + if( size == items.length ) + { + // expand queue + final int oldLen = items.length; + Object[] tmp = new Object[ oldLen * 2 ]; + System.arraycopy( items, 0, tmp, 0, size); + items = tmp; + } + + items[ size ] = obj; + size++; + } + + /** + * Returns the first element of the queue. + * + * @return null, if the queue is empty, or the element is + * really null. + */ + public Object first() + { + if( size == 0 ) + { + return null; + } + + return items[ size - 1 ]; + } + + public Object last() + { + if( size == 0 ) + { + return null; + } + + return items[ 0 ]; + } + + /** + * Returns true if the queue is empty. + */ + public boolean isEmpty() + { + return ( size == 0 ); + } + + /** + * Returns the number of elements in the queue. + */ + public int size() + { + return size; + } +} \ No newline at end of file