directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r261915 - in /directory/network: branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
Date Sun, 28 Aug 2005 02:02:37 GMT
Author: trustin
Date: Sat Aug 27 19:02:30 2005
New Revision: 261915

URL: http://svn.apache.org/viewcvs?rev=261915&view=rev
Log:
* Fixed deadlock while shutting down.
* Fixed DIRMINA-79 - The number of ThreadPoolFilter threads becomes 0 sometimes.
** Fixed problems in giveUpLead() and fetchBuffer()


Modified:
    directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java
    directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java

Modified: directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java?rev=261915&r1=261914&r2=261915&view=diff
==============================================================================
--- directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java (original)
+++ directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java Sat Aug
27 19:02:30 2005
@@ -288,6 +288,7 @@
     {
         private final int id;
         private final Object promotionLock = new Object();
+        private boolean dead;
 
         private Worker()
         {
@@ -297,14 +298,21 @@
             increasePoolSize();
         }
 
-        public void lead()
+        public boolean lead()
         {
             final Object promotionLock = this.promotionLock;
             synchronized( promotionLock )
             {
+                if( dead )
+                {
+                    return false;
+                }
+
                 leader = this;
                 promotionLock.notify();
             }
+            
+            return true;
         }
 
         public void run()
@@ -319,14 +327,7 @@
 
                 if( buf == null )
                 {
-                    if( shuttingDown )
-                    {
-                        break;
-                    }
-                    else
-                    {
-                        continue;
-                    }
+                    break;
                 }
 
                 processEvents( buf );
@@ -340,42 +341,40 @@
 
         private SessionBuffer fetchBuffer()
         {
-            SessionBuffer buf = null;
-            BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
+            SessionBuffer buf;
+            BlockingSet readySessionBuffers = ThreadPoolFilter.this.readySessionBuffers;
             synchronized( readySessionBuffers )
             {
-                do
+                for( ;; )
                 {
-                    buf = null;
                     try
                     {
                         readySessionBuffers.waitForNewItem();
                     }
                     catch( InterruptedException e )
                     {
-                        break;
+                        if( shuttingDown )
+                        {
+                            return null;
+                        }
+                        else
+                        {
+                            continue;
+                        }
                     }
 
                     Iterator it = readySessionBuffers.iterator();
-                    if( !it.hasNext() )
+                    while( it.hasNext() )
                     {
-                        // exceeded keepAliveTime
-                        break;
-                    }
-
-                    do
-                    {
-                        buf = null;
                         buf = ( SessionBuffer ) it.next();
                         it.remove();
+                        if( buf != null && !buf.eventQueue.isEmpty() )
+                        {
+                            return buf;
+                        }
                     }
-                    while( buf != null && buf.eventQueue.isEmpty()
-                           && it.hasNext() );
                 }
-                while( buf != null && buf.eventQueue.isEmpty() );
             }
-
-            return buf;
         }
 
         private void processEvents( SessionBuffer buf )
@@ -482,6 +481,9 @@
                     {
                         followers.remove( this );
                     }
+
+                    // Mark as dead explicitly when we've got promotionLock.
+                    dead = true;
                 }
 
                 return timeToLead;
@@ -490,33 +492,34 @@
 
         private void giveUpLead()
         {
-            final Stack followers = BaseThreadPool.this.followers;
+            final Stack followers = ThreadPoolFilter.this.followers;
             Worker worker;
-            synchronized( followers )
+            do
             {
-                worker = ( Worker ) followers.pop();
-            }
+                synchronized( followers )
+                {
+                    worker = ( Worker ) followers.pop();
+                }
 
-            if( worker != null )
-            {
-                worker.lead();
-            }
-            else
-            {
-                if( !shuttingDown )
+                if( worker == null )
                 {
-                    synchronized( BaseThreadPool.this )
+                    // Increase the number of threads if we
+                    // are not shutting down and we can increase the number.
+                    if( !shuttingDown
+                        && getPoolSize() < getMaximumPoolSize() )
                     {
-                        if( !shuttingDown
-                            && getPoolSize() < getMaximumPoolSize() )
-                        {
-                            worker = new Worker();
-                            worker.start();
-                            worker.lead();
-                        }
+                        worker = new Worker();
+                        worker.lead();
+                        worker.start();
                     }
+
+                    // This loop should end because:
+                    // 1) lead() is called already,
+                    // 2) or it is shutting down and there's no more threads left.
+                    break;
                 }
             }
+            while( !worker.lead() );
         }
     }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=261915&r1=261914&r2=261915&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Sat Aug
27 19:02:30 2005
@@ -254,6 +254,7 @@
     {
         private final int id;
         private final Object promotionLock = new Object();
+        private boolean dead;
 
         private Worker()
         {
@@ -263,14 +264,21 @@
             increasePoolSize();
         }
 
-        public void lead()
+        public boolean lead()
         {
             final Object promotionLock = this.promotionLock;
             synchronized( promotionLock )
             {
+                if( dead )
+                {
+                    return false;
+                }
+
                 leader = this;
                 promotionLock.notify();
             }
+            
+            return true;
         }
 
         public void run()
@@ -279,20 +287,12 @@
             {
                 if( !waitForPromotion() )
                     break;
-                
+
                 SessionBuffer buf = fetchBuffer();
                 giveUpLead();
-
                 if( buf == null )
                 {
-                    if( shuttingDown )
-                    {
-                        break;
-                    }
-                    else
-                    {
-                        continue;
-                    }
+                    break;
                 }
 
                 processEvents( buf );
@@ -306,42 +306,40 @@
 
         private SessionBuffer fetchBuffer()
         {
-            SessionBuffer buf = null;
+            SessionBuffer buf;
             BlockingSet readySessionBuffers = ThreadPoolFilter.this.readySessionBuffers;
             synchronized( readySessionBuffers )
             {
-                do
+                for( ;; )
                 {
-                    buf = null;
                     try
                     {
                         readySessionBuffers.waitForNewItem();
                     }
                     catch( InterruptedException e )
                     {
-                        break;
+                        if( shuttingDown )
+                        {
+                            return null;
+                        }
+                        else
+                        {
+                            continue;
+                        }
                     }
 
                     Iterator it = readySessionBuffers.iterator();
-                    if( !it.hasNext() )
-                    {
-                        // exceeded keepAliveTime
-                        break;
-                    }
-
-                    do
+                    while( it.hasNext() )
                     {
-                        buf = null;
                         buf = ( SessionBuffer ) it.next();
                         it.remove();
+                        if( buf != null && !buf.eventQueue.isEmpty() )
+                        {
+                            return buf;
+                        }
                     }
-                    while( buf != null && buf.eventQueue.isEmpty()
-                           && it.hasNext() );
                 }
-                while( buf != null && buf.eventQueue.isEmpty() );
             }
-
-            return buf;
         }
 
         private void processEvents( SessionBuffer buf )
@@ -448,6 +446,9 @@
                     {
                         followers.remove( this );
                     }
+
+                    // Mark as dead explicitly when we've got promotionLock.
+                    dead = true;
                 }
 
                 return timeToLead;
@@ -458,31 +459,32 @@
         {
             final Stack followers = ThreadPoolFilter.this.followers;
             Worker worker;
-            synchronized( followers )
+            do
             {
-                worker = ( Worker ) followers.pop();
-            }
+                synchronized( followers )
+                {
+                    worker = ( Worker ) followers.pop();
+                }
 
-            if( worker != null )
-            {
-                worker.lead();
-            }
-            else
-            {
-                if( !shuttingDown )
+                if( worker == null )
                 {
-                    synchronized( ThreadPoolFilter.this )
-                    {
-                        if( !shuttingDown
-                            && getPoolSize() < getMaximumPoolSize() )
-                        {
-                            worker = new Worker();
-                            worker.start();
-                            worker.lead();
-                        }
+                    // Increase the number of threads if we
+                    // are not shutting down and we can increase the number.
+                    if( !shuttingDown
+                        && getPoolSize() < getMaximumPoolSize() )
+                    {
+                        worker = new Worker();
+                        worker.lead();
+                        worker.start();
                     }
+
+                    // This loop should end because:
+                    // 1) lead() is called already,
+                    // 2) or it is shutting down and there's no more threads left.
+                    break;
                 }
             }
+            while( !worker.lead() );
         }
     }
 
@@ -633,7 +635,6 @@
         parents.put( parent, Boolean.TRUE );
 
         shuttingDown = false;
-
         leader = new Worker();
         leader.start();
         leader.lead();



Mime
View raw message