directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akaras...@apache.org
Subject svn commit: rev 9690 - in incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src: java/org/apache/eve/listener test test/org test/org/apache test/org/apache/eve test/org/apache/eve/listener
Date Tue, 23 Mar 2004 00:15:24 GMT
Author: akarasulu
Date: Mon Mar 22 16:15:23 2004
New Revision: 9690

Added:
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/DefaultListenerManagerTest.java
Modified:
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
Log:
Added test case to weed out that locking issue one selection keys that
we also had with the input manager.  Now all works fine its time to test it.


Modified: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
(original)
+++ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
Mon Mar 22 16:15:23 2004
@@ -23,9 +23,10 @@
 import java.util.EventObject ;
 
 import java.io.IOException ;
+
+import java.net.Socket ;
 import java.net.InetAddress ;
 import java.net.InetSocketAddress ;
-import java.net.Socket;
 
 import java.nio.channels.Selector ;
 import java.nio.channels.SelectionKey ;
@@ -53,35 +54,44 @@
     Runnable
 {
     /** event manager used to decouple source to sink relationships */
-    private final EventRouter m_router ;
+    private final EventRouter router ;
     /** selector used to select a acceptable socket channel */
-    private final Selector m_selector ;
+    private final Selector selector ;
     /** the client keys for accepted connections */
-    private final Set m_clients ; 
+    private final Set clients ; 
+    /** the set of listeners managed */
+    private final Set listeners ;
+    /** new listeners waiting to be bound */
+    private final Set bindListeners ;
+    /** old listeners waiting to be unbound */
+    private final Set unbindListeners ;
     
     /** the thread driving this Runnable */ 
-    private Thread m_thread = null ;
+    private Thread thread = null ;
     /** parameter used to politely stop running thread */
-    private Boolean m_hasStarted = null ;
+    private Boolean hasStarted = null ;
     /** the listner manager's monitor */
-    private ListenerManagerMonitor m_monitor = 
-        new ListenerManagerMonitorAdapter() ;
-
+    private ListenerManagerMonitor monitor = null ;
+    
     
     /**
      * Creates a default listener manager using an event router.
      * 
-     * @param a_router the router to publish events to
+     * @param router the router to publish events to
      * @throws IOException
      */
-    public DefaultListenerManager( EventRouter a_router ) throws IOException
+    public DefaultListenerManager( EventRouter router ) throws IOException
     {
-        m_clients = new HashSet() ;
-        m_selector = Selector.open() ;
-        m_hasStarted = new Boolean( false ) ;
-
-        m_router = a_router ;
-        m_router.subscribe( DisconnectEvent.class, null, this ) ;
+        this.router = router ;
+        this.clients = new HashSet() ;
+        this.selector = Selector.open() ;
+        this.listeners = new HashSet() ;
+        this.hasStarted = new Boolean( false ) ;
+        this.bindListeners = new HashSet() ;
+        this.unbindListeners = new HashSet() ;
+        
+        this.router.subscribe( DisconnectEvent.class, null, this ) ;
+        this.monitor = new ListenerManagerMonitorAdapter() ; 
     }
     
     
@@ -92,18 +102,18 @@
      */
     public ListenerManagerMonitor getMonitor()
     {
-        return m_monitor ;
+        return monitor ;
     }
 
     
     /**
      * Sets the monitor.
      * 
-     * @param a_monitor The monitor to set.
+     * @param monitor The monitor to set.
      */
-    public void setMonitor( ListenerManagerMonitor a_monitor )
+    public void setMonitor( ListenerManagerMonitor monitor )
     {
-        m_monitor = a_monitor ;
+        this.monitor = monitor ;
     }
 
 
@@ -111,27 +121,14 @@
      * @see org.apache.eve.listener.ListenerManager#register(org.apache.eve.
      * listener.ServerListener)
      */
-    public void bind( ServerListener a_listener ) throws IOException
+    public void bind( ServerListener listener ) throws IOException
     {
-        try
+        synchronized ( bindListeners )
         {
-            ServerSocketChannel l_channel = ServerSocketChannel.open() ;
-            InetSocketAddress l_address = new InetSocketAddress( 
-                    InetAddress.getByAddress( a_listener.getAddress() ), 
-                    a_listener.getPort() ) ;
-            l_channel.socket().bind( l_address, a_listener.getBacklog() ) ;
-            l_channel.configureBlocking( false ) ;
-            m_selector.wakeup() ;
-            l_channel.register( m_selector, SelectionKey.OP_ACCEPT, 
-                    a_listener ) ;
-        }
-        catch ( IOException e )
-        {
-            m_monitor.failedToBind( a_listener, e ) ;
-            throw e ;
+            bindListeners.add( listener ) ;
         }
         
-        m_monitor.bindOccured( a_listener ) ;
+        selector.wakeup() ;
     }
     
     
@@ -139,35 +136,103 @@
      * @see org.apache.eve.listener.ListenerManager#unregister(org.apache.eve.
      * listener.ServerListener)
      */
-    public void unbind( ServerListener a_listener ) throws IOException
+    public void unbind( ServerListener listener ) throws IOException
     {
-        SelectionKey l_key = null ;
-        Iterator l_keys = m_selector.keys().iterator() ;
-        
-        while ( l_keys.hasNext() )
+        synchronized ( unbindListeners )
         {
-            l_key = ( SelectionKey ) l_keys.next() ;
-            if ( l_key.attachment().equals( a_listener ) )
-            {
-                break ;
-            }
+            unbindListeners.add( listener ) ;
         }
 
-        try
-        {
-            l_key.channel().close() ;
-        }
-        catch ( IOException e )
+        selector.wakeup() ;
+    }
+    
+
+    /**
+     * Binds all the listeners that have been collecting up waiting to be bound.
+     * This is not fail fast - meaning it will try all the connections in the
+     * ready to bind set even if one fails.
+     */
+    private void bind()
+    {
+        synchronized ( bindListeners )
         {
-            m_monitor.failedToUnbind( a_listener, e ) ;
-            throw e ;
+            Iterator list = bindListeners.iterator() ;
+            while ( list.hasNext() )
+            {
+                ServerListener listener = ( ServerListener ) list.next() ;
+                    
+                try
+                {
+                    ServerSocketChannel channel = ServerSocketChannel.open() ;
+                    InetSocketAddress address = new InetSocketAddress( 
+                            InetAddress.getByAddress( listener.getAddress() ), 
+                            listener.getPort() ) ;
+                    channel.socket().bind( address, listener.getBacklog() ) ;
+                    channel.configureBlocking( false ) ;
+                    channel.register( selector, SelectionKey.OP_ACCEPT, 
+                            listener ) ;
+                    
+                    synchronized ( listeners )
+                    {
+                        listeners.add( listener ) ;
+                    }
+                    
+                    bindListeners.remove( listener ) ;
+                }
+                catch ( IOException e )
+                {
+                    monitor.failedToBind( listener, e ) ;
+                }
+            
+                monitor.bindOccured( listener ) ;
+            }
         }
-        
-        l_key.cancel() ;
-        m_monitor.unbindOccured( a_listener ) ;
     }
     
     
+    /**
+     * Unbinds listeners that have been collecting up waiting to be unbound.
+     * This is not fail fast - meaning it will try all the connections in the
+     * ready to unbind set even if one fails.
+     */
+    private void unbind()
+    {
+        SelectionKey key = null ;
+        
+        synchronized ( unbindListeners ) 
+        {
+            Iterator keys = selector.keys().iterator() ;
+            while ( keys.hasNext() )
+            {
+                key = ( SelectionKey ) keys.next() ;
+                ServerListener listener = ( ServerListener ) key.attachment() ;
+    
+                if ( unbindListeners.contains( listener ) )
+                {    
+                    try
+                    {
+                        key.channel().close() ;
+                    }
+                    catch ( IOException e )
+                    {
+                        monitor.failedToUnbind( listener, e ) ;
+                    }
+                
+                    key.cancel() ;
+                    
+                    synchronized ( listeners )
+                    {
+                        listeners.remove( listener ) ;
+                    }
+                    
+                    unbindListeners.remove( listener ) ;
+                    monitor.unbindOccured( listener ) ;
+                }
+            }
+        }
+    }
+
+
     // ------------------------------------------------------------------------
     // DisconnectSubscriber Implementation
     // ------------------------------------------------------------------------
@@ -176,19 +241,19 @@
     /**
      * Disconnects a client by removing the clientKey from the listener.
      * 
-     * @param an_event the disconnect event
+     * @param event the disconnect event
      */
-    public void inform( DisconnectEvent an_event )
+    public void inform( DisconnectEvent event )
     {
-        m_clients.remove( an_event.getClientKey() ) ;
+        clients.remove( event.getClientKey() ) ;
         
         try
         {
-            an_event.getClientKey().expire() ;
+            event.getClientKey().expire() ;
         }
         catch ( IOException e ) 
         {
-            m_monitor.failedToExpire( an_event.getClientKey(), e ) ;
+            monitor.failedToExpire( event.getClientKey(), e ) ;
         }
     }
     
@@ -197,9 +262,9 @@
      *  (non-Javadoc)
      * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
      */
-    public void inform( EventObject an_event )
+    public void inform( EventObject event )
     {
-        inform( ( DisconnectEvent ) an_event ) ;
+        inform( ( DisconnectEvent ) event ) ;
     }
     
     
@@ -213,60 +278,62 @@
      */
     public void run()
     {
-        while ( m_hasStarted.booleanValue() ) 
+        while ( hasStarted.booleanValue() ) 
         {
-            int l_count = 0 ;
+            int count = 0 ;
             
             try
             {
-                m_monitor.enteringSelect( m_selector ) ;
-                if ( 0 == ( l_count = m_selector.select() ) )
+                monitor.enteringSelect( selector ) ;
+                
+                bind() ;
+                unbind() ;
+                
+                if ( 0 == ( count = selector.select() ) )
                 {
-                    m_monitor.selectTimedOut( m_selector ) ;
+                    monitor.selectTimedOut( selector ) ;
                     continue ;
                 }
             } 
             catch( IOException e )
             {
-                m_monitor.failedToSelect( m_selector, e ) ;
+                monitor.failedToSelect( selector, e ) ;
                 continue ;
             }
             
             
-            Iterator l_list = m_selector.selectedKeys().iterator() ;
-            while ( l_list.hasNext() )
+            Iterator list = selector.selectedKeys().iterator() ;
+            while ( list.hasNext() )
             {
-                SelectionKey l_key = ( SelectionKey ) l_list.next() ;
+                SelectionKey key = ( SelectionKey ) list.next() ;
                 
-                if ( l_key.isAcceptable() )
+                if ( key.isAcceptable() )
                 {
-                    SocketChannel l_channel = null ;
-                    ServerSocketChannel l_server = ( ServerSocketChannel )
-                        l_key.channel() ;
+                    SocketChannel channel = null ;
+                    ServerSocketChannel server = ( ServerSocketChannel )
+                        key.channel() ;
                     
                     try
                     {
-                        l_channel = l_server.accept() ;
-                        l_list.remove() ;
-                        m_monitor.acceptOccured( l_key ) ;
+                        channel = server.accept() ;
+                        list.remove() ;
+                        monitor.acceptOccured( key ) ;
                     }
                     catch ( IOException e )
                     {
-                        m_monitor.failedToAccept( l_key, e ) ;
+                        monitor.failedToAccept( key, e ) ;
                         continue ;
                     }
                     
-                    ClientKey l_clientKey = 
-                        new ClientKey( l_channel.socket() ) ;
-                    ConnectEvent l_event = 
-                        new ConnectEvent( this, l_clientKey ) ;
-                    m_router.publish( l_event ) ;
+                    ClientKey clientKey = new ClientKey( channel.socket() ) ;
+                    ConnectEvent event = new ConnectEvent( this, clientKey ) ;
+                    router.publish( event ) ;
                 }
             }
         }
     }
-
-
+    
+    
     /**
      * Starts up this ListnerManager service.
      * 
@@ -275,15 +342,15 @@
      */
     public void start() throws InterruptedException
     {
-        if ( m_hasStarted.booleanValue() )
+        if ( hasStarted.booleanValue() )
         {
             throw new IllegalStateException( "Already started!" ) ;
         }
         
-        m_hasStarted = new Boolean( true ) ;
-        m_thread = new Thread( this ) ;
-        m_thread.start() ;
-        m_monitor.started() ;
+        hasStarted = new Boolean( true ) ;
+        thread = new Thread( this ) ;
+        thread.start() ;
+        monitor.started() ;
     }
     
     
@@ -295,39 +362,78 @@
      */
     public void stop() throws InterruptedException
     {
-        m_hasStarted = new Boolean( false ) ;
-        m_selector.wakeup() ;
+        hasStarted = new Boolean( false ) ;
+        selector.wakeup() ;
+
+        /*
+         * First lets shutdown the listeners so we're not open to having new
+         * connections created while we are trying to shutdown.  Plus we want 
+         * to make the thread for this component do the work to prevent locking
+         * issues with the selector.
+         */
+        if ( ! listeners.isEmpty() )
+        {
+            Iterator list = listeners.iterator() ;
+            while( list.hasNext() )
+            {
+                ServerListener listener = ( ServerListener ) list.next() ;
+                    
+                try
+                {
+                    /*
+                     * put the listening in the set ready to be unbound by
+                     * the runnable's thread of execution
+                     */
+                    unbind( listener ) ;
+                }
+                catch( IOException e )
+                {
+                    // monitor.doSomthing( e ) ;
+                    e.printStackTrace() ;
+                }
+            }
+        }
         
-        while ( m_thread.isAlive() || ! m_clients.isEmpty() )
+        /*
+         * Now we gracefully disconnect the clients that are already connected 
+         * so they can complete their current requests and recieve a 
+         * notification of disconnect.  At this point we don't know how we're 
+         * going to do that so we just do it abruptly for the time being.  This
+         * will need to be changed in the future. 
+         */
+        if ( ! clients.isEmpty() )
         {
-            if ( ! m_clients.isEmpty() )
+            synchronized( clients )
             {
-                synchronized( m_clients )
+                Iterator list = clients.iterator() ;
+                while ( list.hasNext() )
                 {
-                    Iterator list = m_clients.iterator() ;
-                    while ( list.hasNext() )
-                    {
-                        ClientKey key = ( ClientKey ) list.next() ;
+                    ClientKey key = ( ClientKey ) list.next() ;
                         
-                        try
-                        {
-                            Socket socket = key.getSocket() ;
-                            socket.close() ;
-                            list.remove() ;
-                        }
-                        catch( IOException e )
-                        {
-                            // monitor.doSomthing( e ) ;
-                            e.printStackTrace() ;
-                        }
+                    try
+                    {
+                        Socket socket = key.getSocket() ;
+                        socket.close() ;
+                        list.remove() ;
+                    }
+                    catch( IOException e )
+                    {
+                        // monitor.doSomthing( e ) ;
+                        e.printStackTrace() ;
                     }
                 }
             }
-            
+        }
+
+        /*
+         * now wait until the thread of execution for this runnable dies
+         */
+        if ( this.thread.isAlive() )
+        {
             Thread.sleep( 100 ) ;
-            m_selector.wakeup() ;
+            selector.wakeup() ;
         }
-        
-        m_monitor.stopped() ;
+
+        monitor.stopped() ;
     }
 }

Added: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/DefaultListenerManagerTest.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/test/org/apache/eve/listener/DefaultListenerManagerTest.java
Mon Mar 22 16:15:23 2004
@@ -0,0 +1,92 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.eve.listener ;
+
+
+import org.apache.eve.event.EventRouter ;
+import org.apache.eve.event.DefaultEventRouter ;
+
+import junit.framework.TestCase ;
+
+
+/**
+ * Tests the default ListenerManager's operations.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class DefaultListenerManagerTest extends TestCase
+{
+    /** An event router to use for testing */
+    private EventRouter router = null ;
+    /** the defualt ListenerManager to test */
+    private DefaultListenerManager listener = null ;
+    
+    
+
+    /*
+     * @see TestCase#setUp()
+     */
+    protected void setUp() throws Exception
+    {
+        super.setUp() ;
+        
+        router = new DefaultEventRouter() ;
+        listener = new DefaultListenerManager( router ) ;
+        listener.start() ;
+    }
+
+    
+    /*
+     * @see TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception
+    {
+        super.tearDown() ;
+        
+        router = null ;
+        listener.stop() ;
+        listener = null ;
+    }
+    
+
+    /**
+     * Constructor for DefaultListenerManagerTest.
+     * @param arg0
+     */
+    public DefaultListenerManagerTest( String arg0 )
+    {
+        super( arg0 ) ;
+    }
+    
+
+    public void testBind() throws Exception
+    {
+        listener.bind( new 
+                LdapServerListener( "localhost", 10389, 100, false ) ) ;
+    }
+
+    
+    public void testUnbind() throws Exception
+    {
+        ServerListener sl = new 
+            LdapServerListener( "localhost", 10389, 100, false ) ;
+        listener.bind( sl ) ;
+        listener.unbind( sl ) ;
+    }
+}

Mime
View raw message