Return-Path: Delivered-To: apmail-incubator-directory-cvs-archive@www.apache.org Received: (qmail 51535 invoked from network); 23 Mar 2004 00:15:26 -0000 Received: from daedalus.apache.org (HELO mail.apache.org) (208.185.179.12) by minotaur-2.apache.org with SMTP; 23 Mar 2004 00:15:26 -0000 Received: (qmail 61459 invoked by uid 500); 23 Mar 2004 00:15:11 -0000 Delivered-To: apmail-incubator-directory-cvs-archive@incubator.apache.org Received: (qmail 61425 invoked by uid 500); 23 Mar 2004 00:15:11 -0000 Mailing-List: contact directory-cvs-help@incubator.apache.org; run by ezmlm Precedence: bulk Reply-To: directory-dev@incubator.apache.org list-help: list-unsubscribe: list-post: Delivered-To: mailing list directory-cvs@incubator.apache.org Received: (qmail 61402 invoked from network); 23 Mar 2004 00:15:10 -0000 Received: from unknown (HELO minotaur.apache.org) (209.237.227.194) by daedalus.apache.org with SMTP; 23 Mar 2004 00:15:10 -0000 Received: (qmail 51507 invoked by uid 65534); 23 Mar 2004 00:15:24 -0000 Date: 23 Mar 2004 00:15:24 -0000 Message-ID: <20040323001524.51505.qmail@minotaur.apache.org> From: akarasulu@apache.org To: directory-cvs@incubator.apache.org Subject: svn commit: rev 9690 - in incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src: java/org/apache/eve/listener test test/org test/org/apache test/org/apache/eve test/org/apache/eve/listener X-Spam-Rating: daedalus.apache.org 1.6.2 0/1000/N X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N Author: akarasulu Date: Mon Mar 22 16:15:23 2004 New Revision: 9690 Added: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/DefaultListenerManagerTest.java Modified: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java Log: Added test case to weed out that locking issue one selection keys that we also had with the input manager. Now all works fine its time to test it. Modified: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java ============================================================================== --- incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java (original) +++ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java Mon Mar 22 16:15:23 2004 @@ -23,9 +23,10 @@ import java.util.EventObject ; import java.io.IOException ; + +import java.net.Socket ; import java.net.InetAddress ; import java.net.InetSocketAddress ; -import java.net.Socket; import java.nio.channels.Selector ; import java.nio.channels.SelectionKey ; @@ -53,35 +54,44 @@ Runnable { /** event manager used to decouple source to sink relationships */ - private final EventRouter m_router ; + private final EventRouter router ; /** selector used to select a acceptable socket channel */ - private final Selector m_selector ; + private final Selector selector ; /** the client keys for accepted connections */ - private final Set m_clients ; + private final Set clients ; + /** the set of listeners managed */ + private final Set listeners ; + /** new listeners waiting to be bound */ + private final Set bindListeners ; + /** old listeners waiting to be unbound */ + private final Set unbindListeners ; /** the thread driving this Runnable */ - private Thread m_thread = null ; + private Thread thread = null ; /** parameter used to politely stop running thread */ - private Boolean m_hasStarted = null ; + private Boolean hasStarted = null ; /** the listner manager's monitor */ - private ListenerManagerMonitor m_monitor = - new ListenerManagerMonitorAdapter() ; - + private ListenerManagerMonitor monitor = null ; + /** * Creates a default listener manager using an event router. * - * @param a_router the router to publish events to + * @param router the router to publish events to * @throws IOException */ - public DefaultListenerManager( EventRouter a_router ) throws IOException + public DefaultListenerManager( EventRouter router ) throws IOException { - m_clients = new HashSet() ; - m_selector = Selector.open() ; - m_hasStarted = new Boolean( false ) ; - - m_router = a_router ; - m_router.subscribe( DisconnectEvent.class, null, this ) ; + this.router = router ; + this.clients = new HashSet() ; + this.selector = Selector.open() ; + this.listeners = new HashSet() ; + this.hasStarted = new Boolean( false ) ; + this.bindListeners = new HashSet() ; + this.unbindListeners = new HashSet() ; + + this.router.subscribe( DisconnectEvent.class, null, this ) ; + this.monitor = new ListenerManagerMonitorAdapter() ; } @@ -92,18 +102,18 @@ */ public ListenerManagerMonitor getMonitor() { - return m_monitor ; + return monitor ; } /** * Sets the monitor. * - * @param a_monitor The monitor to set. + * @param monitor The monitor to set. */ - public void setMonitor( ListenerManagerMonitor a_monitor ) + public void setMonitor( ListenerManagerMonitor monitor ) { - m_monitor = a_monitor ; + this.monitor = monitor ; } @@ -111,27 +121,14 @@ * @see org.apache.eve.listener.ListenerManager#register(org.apache.eve. * listener.ServerListener) */ - public void bind( ServerListener a_listener ) throws IOException + public void bind( ServerListener listener ) throws IOException { - try + synchronized ( bindListeners ) { - ServerSocketChannel l_channel = ServerSocketChannel.open() ; - InetSocketAddress l_address = new InetSocketAddress( - InetAddress.getByAddress( a_listener.getAddress() ), - a_listener.getPort() ) ; - l_channel.socket().bind( l_address, a_listener.getBacklog() ) ; - l_channel.configureBlocking( false ) ; - m_selector.wakeup() ; - l_channel.register( m_selector, SelectionKey.OP_ACCEPT, - a_listener ) ; - } - catch ( IOException e ) - { - m_monitor.failedToBind( a_listener, e ) ; - throw e ; + bindListeners.add( listener ) ; } - m_monitor.bindOccured( a_listener ) ; + selector.wakeup() ; } @@ -139,35 +136,103 @@ * @see org.apache.eve.listener.ListenerManager#unregister(org.apache.eve. * listener.ServerListener) */ - public void unbind( ServerListener a_listener ) throws IOException + public void unbind( ServerListener listener ) throws IOException { - SelectionKey l_key = null ; - Iterator l_keys = m_selector.keys().iterator() ; - - while ( l_keys.hasNext() ) + synchronized ( unbindListeners ) { - l_key = ( SelectionKey ) l_keys.next() ; - if ( l_key.attachment().equals( a_listener ) ) - { - break ; - } + unbindListeners.add( listener ) ; } - try - { - l_key.channel().close() ; - } - catch ( IOException e ) + selector.wakeup() ; + } + + + /** + * Binds all the listeners that have been collecting up waiting to be bound. + * This is not fail fast - meaning it will try all the connections in the + * ready to bind set even if one fails. + */ + private void bind() + { + synchronized ( bindListeners ) { - m_monitor.failedToUnbind( a_listener, e ) ; - throw e ; + Iterator list = bindListeners.iterator() ; + while ( list.hasNext() ) + { + ServerListener listener = ( ServerListener ) list.next() ; + + try + { + ServerSocketChannel channel = ServerSocketChannel.open() ; + InetSocketAddress address = new InetSocketAddress( + InetAddress.getByAddress( listener.getAddress() ), + listener.getPort() ) ; + channel.socket().bind( address, listener.getBacklog() ) ; + channel.configureBlocking( false ) ; + channel.register( selector, SelectionKey.OP_ACCEPT, + listener ) ; + + synchronized ( listeners ) + { + listeners.add( listener ) ; + } + + bindListeners.remove( listener ) ; + } + catch ( IOException e ) + { + monitor.failedToBind( listener, e ) ; + } + + monitor.bindOccured( listener ) ; + } } - - l_key.cancel() ; - m_monitor.unbindOccured( a_listener ) ; } + /** + * Unbinds listeners that have been collecting up waiting to be unbound. + * This is not fail fast - meaning it will try all the connections in the + * ready to unbind set even if one fails. + */ + private void unbind() + { + SelectionKey key = null ; + + synchronized ( unbindListeners ) + { + Iterator keys = selector.keys().iterator() ; + while ( keys.hasNext() ) + { + key = ( SelectionKey ) keys.next() ; + ServerListener listener = ( ServerListener ) key.attachment() ; + + if ( unbindListeners.contains( listener ) ) + { + try + { + key.channel().close() ; + } + catch ( IOException e ) + { + monitor.failedToUnbind( listener, e ) ; + } + + key.cancel() ; + + synchronized ( listeners ) + { + listeners.remove( listener ) ; + } + + unbindListeners.remove( listener ) ; + monitor.unbindOccured( listener ) ; + } + } + } + } + + // ------------------------------------------------------------------------ // DisconnectSubscriber Implementation // ------------------------------------------------------------------------ @@ -176,19 +241,19 @@ /** * Disconnects a client by removing the clientKey from the listener. * - * @param an_event the disconnect event + * @param event the disconnect event */ - public void inform( DisconnectEvent an_event ) + public void inform( DisconnectEvent event ) { - m_clients.remove( an_event.getClientKey() ) ; + clients.remove( event.getClientKey() ) ; try { - an_event.getClientKey().expire() ; + event.getClientKey().expire() ; } catch ( IOException e ) { - m_monitor.failedToExpire( an_event.getClientKey(), e ) ; + monitor.failedToExpire( event.getClientKey(), e ) ; } } @@ -197,9 +262,9 @@ * (non-Javadoc) * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject) */ - public void inform( EventObject an_event ) + public void inform( EventObject event ) { - inform( ( DisconnectEvent ) an_event ) ; + inform( ( DisconnectEvent ) event ) ; } @@ -213,60 +278,62 @@ */ public void run() { - while ( m_hasStarted.booleanValue() ) + while ( hasStarted.booleanValue() ) { - int l_count = 0 ; + int count = 0 ; try { - m_monitor.enteringSelect( m_selector ) ; - if ( 0 == ( l_count = m_selector.select() ) ) + monitor.enteringSelect( selector ) ; + + bind() ; + unbind() ; + + if ( 0 == ( count = selector.select() ) ) { - m_monitor.selectTimedOut( m_selector ) ; + monitor.selectTimedOut( selector ) ; continue ; } } catch( IOException e ) { - m_monitor.failedToSelect( m_selector, e ) ; + monitor.failedToSelect( selector, e ) ; continue ; } - Iterator l_list = m_selector.selectedKeys().iterator() ; - while ( l_list.hasNext() ) + Iterator list = selector.selectedKeys().iterator() ; + while ( list.hasNext() ) { - SelectionKey l_key = ( SelectionKey ) l_list.next() ; + SelectionKey key = ( SelectionKey ) list.next() ; - if ( l_key.isAcceptable() ) + if ( key.isAcceptable() ) { - SocketChannel l_channel = null ; - ServerSocketChannel l_server = ( ServerSocketChannel ) - l_key.channel() ; + SocketChannel channel = null ; + ServerSocketChannel server = ( ServerSocketChannel ) + key.channel() ; try { - l_channel = l_server.accept() ; - l_list.remove() ; - m_monitor.acceptOccured( l_key ) ; + channel = server.accept() ; + list.remove() ; + monitor.acceptOccured( key ) ; } catch ( IOException e ) { - m_monitor.failedToAccept( l_key, e ) ; + monitor.failedToAccept( key, e ) ; continue ; } - ClientKey l_clientKey = - new ClientKey( l_channel.socket() ) ; - ConnectEvent l_event = - new ConnectEvent( this, l_clientKey ) ; - m_router.publish( l_event ) ; + ClientKey clientKey = new ClientKey( channel.socket() ) ; + ConnectEvent event = new ConnectEvent( this, clientKey ) ; + router.publish( event ) ; } } } } - - + + /** * Starts up this ListnerManager service. * @@ -275,15 +342,15 @@ */ public void start() throws InterruptedException { - if ( m_hasStarted.booleanValue() ) + if ( hasStarted.booleanValue() ) { throw new IllegalStateException( "Already started!" ) ; } - m_hasStarted = new Boolean( true ) ; - m_thread = new Thread( this ) ; - m_thread.start() ; - m_monitor.started() ; + hasStarted = new Boolean( true ) ; + thread = new Thread( this ) ; + thread.start() ; + monitor.started() ; } @@ -295,39 +362,78 @@ */ public void stop() throws InterruptedException { - m_hasStarted = new Boolean( false ) ; - m_selector.wakeup() ; + hasStarted = new Boolean( false ) ; + selector.wakeup() ; + + /* + * First lets shutdown the listeners so we're not open to having new + * connections created while we are trying to shutdown. Plus we want + * to make the thread for this component do the work to prevent locking + * issues with the selector. + */ + if ( ! listeners.isEmpty() ) + { + Iterator list = listeners.iterator() ; + while( list.hasNext() ) + { + ServerListener listener = ( ServerListener ) list.next() ; + + try + { + /* + * put the listening in the set ready to be unbound by + * the runnable's thread of execution + */ + unbind( listener ) ; + } + catch( IOException e ) + { + // monitor.doSomthing( e ) ; + e.printStackTrace() ; + } + } + } - while ( m_thread.isAlive() || ! m_clients.isEmpty() ) + /* + * Now we gracefully disconnect the clients that are already connected + * so they can complete their current requests and recieve a + * notification of disconnect. At this point we don't know how we're + * going to do that so we just do it abruptly for the time being. This + * will need to be changed in the future. + */ + if ( ! clients.isEmpty() ) { - if ( ! m_clients.isEmpty() ) + synchronized( clients ) { - synchronized( m_clients ) + Iterator list = clients.iterator() ; + while ( list.hasNext() ) { - Iterator list = m_clients.iterator() ; - while ( list.hasNext() ) - { - ClientKey key = ( ClientKey ) list.next() ; + ClientKey key = ( ClientKey ) list.next() ; - try - { - Socket socket = key.getSocket() ; - socket.close() ; - list.remove() ; - } - catch( IOException e ) - { - // monitor.doSomthing( e ) ; - e.printStackTrace() ; - } + try + { + Socket socket = key.getSocket() ; + socket.close() ; + list.remove() ; + } + catch( IOException e ) + { + // monitor.doSomthing( e ) ; + e.printStackTrace() ; } } } - + } + + /* + * now wait until the thread of execution for this runnable dies + */ + if ( this.thread.isAlive() ) + { Thread.sleep( 100 ) ; - m_selector.wakeup() ; + selector.wakeup() ; } - - m_monitor.stopped() ; + + monitor.stopped() ; } } Added: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/DefaultListenerManagerTest.java ============================================================================== --- (empty file) +++ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/DefaultListenerManagerTest.java Mon Mar 22 16:15:23 2004 @@ -0,0 +1,92 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eve.listener ; + + +import org.apache.eve.event.EventRouter ; +import org.apache.eve.event.DefaultEventRouter ; + +import junit.framework.TestCase ; + + +/** + * Tests the default ListenerManager's operations. + * + * @author + * Apache Directory Project + * @version $Rev$ + */ +public class DefaultListenerManagerTest extends TestCase +{ + /** An event router to use for testing */ + private EventRouter router = null ; + /** the defualt ListenerManager to test */ + private DefaultListenerManager listener = null ; + + + + /* + * @see TestCase#setUp() + */ + protected void setUp() throws Exception + { + super.setUp() ; + + router = new DefaultEventRouter() ; + listener = new DefaultListenerManager( router ) ; + listener.start() ; + } + + + /* + * @see TestCase#tearDown() + */ + protected void tearDown() throws Exception + { + super.tearDown() ; + + router = null ; + listener.stop() ; + listener = null ; + } + + + /** + * Constructor for DefaultListenerManagerTest. + * @param arg0 + */ + public DefaultListenerManagerTest( String arg0 ) + { + super( arg0 ) ; + } + + + public void testBind() throws Exception + { + listener.bind( new + LdapServerListener( "localhost", 10389, 100, false ) ) ; + } + + + public void testUnbind() throws Exception + { + ServerListener sl = new + LdapServerListener( "localhost", 10389, 100, false ) ; + listener.bind( sl ) ; + listener.unbind( sl ) ; + } +}