hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rola...@apache.org
Subject svn commit: r607287 - in /httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm: AbstractConnPool.java ConnPoolByRoute.java WaitingThread.java
Date Fri, 28 Dec 2007 16:59:18 GMT
Author: rolandw
Date: Fri Dec 28 08:59:17 2007
New Revision: 607287

URL: http://svn.apache.org/viewvc?rev=607287&view=rev
Log:
HTTPCLIENT-677: replacing synchronized with Lock/Condition

Added:
    httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
  (with props)
Modified:
    httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
    httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java

Modified: httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java?rev=607287&r1=607286&r2=607287&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
(original)
+++ httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
Fri Dec 28 08:59:17 2007
@@ -37,6 +37,8 @@
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,6 +55,9 @@
 /**
  * An abstract connection pool.
  * It is used by the {@link ThreadSafeClientConnManager}.
+ * The abstract pool includes a {@link #poolLock}, which is used to
+ * synchronize access to the internal pool datastructures.
+ * Don't use <code>synchronized</code> for that purpose!
  */
 public abstract class AbstractConnPool implements RefQueueHandler {
 
@@ -60,6 +65,12 @@
     private final Log LOG = LogFactory.getLog(AbstractConnPool.class);
 
     /**
+     * The global lock for this pool.
+     */
+    protected final Lock poolLock;
+
+
+    /**
      * References to issued connections.
      * Objects in this set are of class
      * {@link BasicPoolEntryRef BasicPoolEntryRef},
@@ -135,6 +146,9 @@
         issuedConnections = new HashSet<BasicPoolEntryRef>();
         idleConnHandler = new IdleConnectionHandler();
 
+        boolean fair = false; //@@@ check parameters to decide
+        poolLock = new ReentrantLock(fair);
+
         boolean conngc = true; //@@@ check parameters to decide
         if (conngc) {
             refQueue = new ReferenceQueue<Object>();
@@ -185,25 +199,33 @@
 
 
     // non-javadoc, see interface RefQueueHandler
-    public synchronized void handleReference(Reference<?> ref) {
+    public void handleReference(Reference<?> ref) {
+
+        try {
+            poolLock.lock();
 
-        if (ref instanceof BasicPoolEntryRef) {
-            // check if the GCed pool entry was still in use
-            //@@@ find a way to detect this without lookup
-            //@@@ flag in the BasicPoolEntryRef, to be reset when freed?
-            final boolean lost = issuedConnections.remove(ref);
-            if (lost) {
-                final HttpRoute route = ((BasicPoolEntryRef)ref).getRoute();
+            if (ref instanceof BasicPoolEntryRef) {
+                // check if the GCed pool entry was still in use
+                //@@@ find a way to detect this without lookup
+                //@@@ flag in the BasicPoolEntryRef, to be reset when freed?
+                final boolean lost = issuedConnections.remove(ref);
+                if (lost) {
+                    final HttpRoute route =
+                        ((BasicPoolEntryRef)ref).getRoute();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Connection garbage collected. " + route);
+                    }
+                    handleLostEntry(route);
+                }
+            } else if (ref instanceof ConnMgrRef) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Connection garbage collected. " + route);
+                    LOG.debug("Connection manager garbage collected.");
                 }
-                handleLostEntry(route);
-            }
-        } else if (ref instanceof ConnMgrRef) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Connection manager garbage collected. ");
+                shutdown();
             }
-            shutdown();
+
+        } finally {
+            poolLock.unlock();
         }
     }
 
@@ -225,8 +247,14 @@
      * @param idletime  the time the connections should have been idle
      *                  in order to be closed now
      */
-    public synchronized void closeIdleConnections(long idletime) {
-        idleConnHandler.closeIdleConnections(idletime);
+    public void closeIdleConnections(long idletime) {
+
+        try {
+            poolLock.lock();
+            idleConnHandler.closeIdleConnections(idletime);
+        } finally {
+            poolLock.unlock();
+        }
     }
         
     //@@@ revise this cleanup stuff (closeIdle+deleteClosed), it's not good
@@ -242,31 +270,38 @@
      * Shuts down this pool and all associated resources.
      * Overriding methods MUST call the implementation here!
      */
-    public synchronized void shutdown() {
+    public void shutdown() {
+
+        try {
+            poolLock.lock();
 
-        if (isShutDown)
-            return;
+            if (isShutDown)
+                return;
 
-        // no point in monitoring GC anymore
-        if (refWorker != null)
-            refWorker.shutdown();
-
-        // close all connections that are issued to an application
-        Iterator<BasicPoolEntryRef> iter = issuedConnections.iterator();
-        while (iter.hasNext()) {
-            BasicPoolEntryRef per = iter.next();
-            iter.remove();
-            BasicPoolEntry entry = per.get();
-            if (entry != null) {
-                closeConnection(entry.getConnection());
+            // no point in monitoring GC anymore
+            if (refWorker != null)
+                refWorker.shutdown();
+
+            // close all connections that are issued to an application
+            Iterator<BasicPoolEntryRef> iter = issuedConnections.iterator();
+            while (iter.hasNext()) {
+                BasicPoolEntryRef per = iter.next();
+                iter.remove();
+                BasicPoolEntry entry = per.get();
+                if (entry != null) {
+                    closeConnection(entry.getConnection());
+                }
             }
-        }
 
-        // remove all references to connections
-        //@@@ use this for shutting them down instead?
-        idleConnHandler.removeAll();
+            // remove all references to connections
+            //@@@ use this for shutting them down instead?
+            idleConnHandler.removeAll();
 
-        isShutDown = true;
+            isShutDown = true;
+
+        } finally {
+            poolLock.unlock();
+        }
     }
 
 

Modified: httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java?rev=607287&r1=607286&r2=607287&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
(original)
+++ httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
Fri Dec 28 08:59:17 2007
@@ -35,6 +35,8 @@
 import java.util.Queue;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,6 +57,9 @@
  * <li>connections are re-used only for the exact same route</li>
  * <li>connection limits are enforced per route rather than per host</li>
  * </ul>
+ * Note that access to the pool datastructures is synchronized via the
+ * {@link AbstractConnPool#poolLock poolLock} in the base class,
+ * not via <code>synchronized</code> methods.
  *
  * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
  * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
@@ -66,6 +71,10 @@
     private final Log LOG = LogFactory.getLog(ConnPoolByRoute.class);
 
 
+    /** Temporary hack: @@@ a global condition that goes with the lock. */
+    protected final Condition poolCondition;
+
+
     /** The list of free connections */
     private Queue<BasicPoolEntry> freeConnections;
 
@@ -114,6 +123,8 @@
     public ConnPoolByRoute(ClientConnectionManager mgr) {
         super(mgr);
 
+        poolCondition = poolLock.newCondition(); //@@@ temporary hack
+
         //@@@ use factory method, at least for waitingThreads
         freeConnections = new LinkedList<BasicPoolEntry>();
         waitingThreads = new LinkedList<WaitingThread>();
@@ -130,14 +141,22 @@
      * @return  the pool for the argument route,
      *     never <code>null</code> if <code>create</code> is <code>true</code>
      */
-    protected synchronized RouteSpecificPool getRoutePool(HttpRoute route,
-                                                          boolean create) {
+    protected RouteSpecificPool getRoutePool(HttpRoute route,
+                                             boolean create) {
+        RouteSpecificPool rospl = null;
+
+        try {
+            poolLock.lock();
+
+            rospl = routeToPool.get(route);
+            if ((rospl == null) && create) {
+                // no pool for this route yet (or anymore)
+                rospl = newRouteSpecificPool(route);
+                routeToPool.put(route, rospl);
+            }
 
-        RouteSpecificPool rospl = routeToPool.get(route);
-        if ((rospl == null) && create) {
-            // no pool for this route yet (or anymore)
-            rospl = newRouteSpecificPool(route);
-            routeToPool.put(route, rospl);
+        } finally {
+            poolLock.unlock();
         }
 
         return rospl;
@@ -158,119 +177,133 @@
 
 
     //@@@ consider alternatives for gathering statistics
-    public synchronized int getConnectionsInPool(HttpRoute route) {
-        //@@@ don't allow a pool to be created here!
-        RouteSpecificPool rospl = getRoutePool(route, false);
-        return (rospl != null) ? rospl.getEntryCount() : 0;
+    public int getConnectionsInPool(HttpRoute route) {
+
+        try {
+            poolLock.lock();
+
+            // don't allow a pool to be created here!
+            RouteSpecificPool rospl = getRoutePool(route, false);
+            return (rospl != null) ? rospl.getEntryCount() : 0;
+
+        } finally {
+            poolLock.unlock();
+        }
     }
 
 
     // non-javadoc, see base class AbstractConnPool
-    public synchronized
-        BasicPoolEntry getEntry(HttpRoute route, long timeout,
-                                ClientConnectionOperator operator)
+    public BasicPoolEntry getEntry(HttpRoute route, long timeout,
+                                   ClientConnectionOperator operator)
         throws ConnectionPoolTimeoutException, InterruptedException {
 
-        BasicPoolEntry entry = null;
-
         int maxHostConnections = HttpConnectionManagerParams
             .getMaxConnectionsPerHost(this.params, route);
         int maxTotalConnections = HttpConnectionManagerParams
             .getMaxTotalConnections(this.params);
         
-        RouteSpecificPool rospl = getRoutePool(route, true);
-        WaitingThread waitingThread = null;
-
-        boolean useTimeout = (timeout > 0);
-        long timeToWait = timeout;
-        long startWait = 0;
-        long endWait = 0;
-
-        while (entry == null) {
-
-            if (isShutDown) {
-                throw new IllegalStateException
-                    ("Connection pool shut down.");
-            }
-
-            // the cases to check for:
-            // - have a free connection for that route
-            // - allowed to create a free connection for that route
-            // - can delete and replace a free connection for another route
-            // - need to wait for one of the things above to come true
-
-            entry = getFreeEntry(rospl);
-            if (entry != null) {
-                // we're fine
-                //@@@ yeah this is ugly, but historical... will be revised
-            } else if ((rospl.getEntryCount() < maxHostConnections) &&
-                       (numConnections < maxTotalConnections)) {
-
-                entry = createEntry(rospl, operator);
+        BasicPoolEntry entry = null;
 
-            } else if ((rospl.getEntryCount() < maxHostConnections) &&
-                       (freeConnections.size() > 0)) {
+        try {
+            poolLock.lock();
 
-                deleteLeastUsedEntry();
-                entry = createEntry(rospl, operator);
+            RouteSpecificPool rospl = getRoutePool(route, true);
+            WaitingThread waitingThread = null;
 
-            } else {
-                // TODO: keep track of which routes have waiting threads,
-                // so they avoid being sacrificed before necessary
+            boolean useTimeout = (timeout > 0);
+            long timeToWait = timeout;
+            long startWait = 0;
+            long endWait = 0;
+
+            while (entry == null) {
+
+                if (isShutDown) {
+                    throw new IllegalStateException
+                        ("Connection pool shut down.");
+                }
 
-                try {
-                    if (useTimeout && timeToWait <= 0) {
-                        throw new ConnectionPoolTimeoutException
-                            ("Timeout waiting for connection");
-                    }
+                // the cases to check for:
+                // - have a free connection for that route
+                // - allowed to create a free connection for that route
+                // - can delete and replace a free connection for another route
+                // - need to wait for one of the things above to come true
+
+                entry = getFreeEntry(rospl);
+                if (entry != null) {
+                    // we're fine
+                    //@@@ yeah this is ugly, but historical... will be revised
+                } else if ((rospl.getEntryCount() < maxHostConnections) &&
+                           (numConnections < maxTotalConnections)) {
+
+                    entry = createEntry(rospl, operator);
+
+                } else if ((rospl.getEntryCount() < maxHostConnections) &&
+                           (freeConnections.size() > 0)) {
+
+                    deleteLeastUsedEntry();
+                    entry = createEntry(rospl, operator);
+
+                } else {
+                    // TODO: keep track of which routes have waiting threads,
+                    // so they avoid being sacrificed before necessary
+
+                    try {
+                        if (useTimeout && timeToWait <= 0) {
+                            throw new ConnectionPoolTimeoutException
+                                ("Timeout waiting for connection");
+                        }
    
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Need to wait for connection. " + route);
-                    }
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Need to wait for connection. " + route);
+                        }
    
-                    if (waitingThread == null) {
-                        waitingThread = new WaitingThread();
-                        waitingThread.pool = rospl;
-                        waitingThread.thread = Thread.currentThread();
-                    } else {
-                        waitingThread.interruptedByConnectionPool = false;
-                    }
-
-                    if (useTimeout) {
-                        startWait = System.currentTimeMillis();
-                    }
-
-                    rospl.queueThread(waitingThread);
-                    waitingThreads.add(waitingThread);
-                    wait(timeToWait);
-
-                } catch (InterruptedException e) {
-                    if (!waitingThread.interruptedByConnectionPool) {
-                        LOG.debug("Interrupted while waiting for connection.", e);
-                        throw e;
-                    }
-                    // Else, do nothing, we were interrupted by the
-                    // connection pool and should now have a connection
-                    // waiting for us. Continue in the loop and get it.
-                    // Or else we are shutting down, which is also
-                    // detected in the loop.
-                } finally {
-                    if (!waitingThread.interruptedByConnectionPool) {
-                        // Either we timed out, experienced a
-                        // "spurious wakeup", or were interrupted by an
-                        // external thread.  Regardless we need to 
-                        // cleanup for ourselves in the wait queue.
-                        rospl.removeThread(waitingThread);
-                        waitingThreads.remove(waitingThread);
-                    }
-
-                    if (useTimeout) {
-                        endWait = System.currentTimeMillis();
-                        timeToWait -= (endWait - startWait);
+                        if (waitingThread == null) {
+                            waitingThread = new WaitingThread();
+                            waitingThread.pool = rospl;
+                            waitingThread.thread = Thread.currentThread();
+                        } else {
+                            waitingThread.interruptedByConnectionPool = false;
+                        }
+
+                        if (useTimeout) {
+                            startWait = System.currentTimeMillis();
+                        }
+
+                        rospl.queueThread(waitingThread);
+                        waitingThreads.add(waitingThread);
+                        poolCondition.await(timeToWait, TimeUnit.MILLISECONDS);
+
+                    } catch (InterruptedException e) {
+                        if (!waitingThread.interruptedByConnectionPool) {
+                            LOG.debug("Interrupted while waiting for connection.", e);
+                            throw e;
+                        }
+                        // Else, do nothing, we were interrupted by the
+                        // connection pool and should now have a connection
+                        // waiting for us. Continue in the loop and get it.
+                        // Or else we are shutting down, which is also
+                        // detected in the loop.
+                    } finally {
+                        if (!waitingThread.interruptedByConnectionPool) {
+                            // Either we timed out, experienced a
+                            // "spurious wakeup", or were interrupted by an
+                            // external thread.  Regardless we need to 
+                            // cleanup for ourselves in the wait queue.
+                            rospl.removeThread(waitingThread);
+                            waitingThreads.remove(waitingThread);
+                        }
+
+                        if (useTimeout) {
+                            endWait = System.currentTimeMillis();
+                            timeToWait -= (endWait - startWait);
+                        }
                     }
                 }
-            }
-        } // while no entry
+            } // while no entry
+
+        } finally {
+            poolLock.unlock();
+        }
 
         return entry;
 
@@ -278,37 +311,44 @@
 
 
     // non-javadoc, see base class AbstractConnPool
-    public synchronized void freeEntry(BasicPoolEntry entry) {
+    public void freeEntry(BasicPoolEntry entry) {
 
         HttpRoute route = entry.getPlannedRoute();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Freeing connection. " + route);
         }
 
-        if (isShutDown) {
-            // the pool is shut down, release the
-            // connection's resources and get out of here
-            closeConnection(entry.getConnection());
-            return;
-        }
+        try {
+            poolLock.lock();
+
+            if (isShutDown) {
+                // the pool is shut down, release the
+                // connection's resources and get out of here
+                closeConnection(entry.getConnection());
+                return;
+            }
 
-        // no longer issued, we keep a hard reference now
-        issuedConnections.remove(entry.getWeakRef());
+            // no longer issued, we keep a hard reference now
+            issuedConnections.remove(entry.getWeakRef());
 
-        RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
+            RouteSpecificPool rospl = getRoutePool(route, true);
 
-        rospl.freeEntry(entry);
-        freeConnections.add(entry);
+            rospl.freeEntry(entry);
+            freeConnections.add(entry);
 
-        if (numConnections == 0) {
-            // for some reason this pool didn't already exist
-            LOG.error("Master connection pool not found. " + route);
-            numConnections = 1;
-        }
+            if (numConnections == 0) {
+                // for some reason this pool didn't already exist
+                LOG.error("Master connection pool not found. " + route);
+                numConnections = 1;
+            }
 
-        idleConnHandler.add(entry.getConnection());
+            idleConnHandler.add(entry.getConnection());
 
-        notifyWaitingThread(rospl);
+            notifyWaitingThread(rospl);
+
+        } finally {
+            poolLock.unlock();
+        }
 
     } // freeEntry
 
@@ -322,25 +362,33 @@
      * @return  an available pool entry for the given route, or
      *          <code>null</code> if none is available
      */
-    protected synchronized
-        BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) {
+    protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) {
 
-        BasicPoolEntry entry = rospl.allocEntry();
+        BasicPoolEntry entry = null;
+        try {
+            poolLock.lock();
 
-        if (entry != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Getting free connection. " + rospl.getRoute());
-            }
-            freeConnections.remove(entry);
-            idleConnHandler.remove(entry.getConnection()); // no longer idle
+            entry = rospl.allocEntry();
 
-            issuedConnections.add(entry.getWeakRef());
+            if (entry != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Getting free connection. " + rospl.getRoute());
+                }
+                freeConnections.remove(entry);
+                idleConnHandler.remove(entry.getConnection());// no longer idle
+
+                issuedConnections.add(entry.getWeakRef());
 
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("No free connections. " + rospl.getRoute());
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No free connections. " + rospl.getRoute());
+                }
             }
+
+        } finally {
+            poolLock.unlock();
         }
+
         return entry;
     }
 
@@ -355,20 +403,27 @@
      *
      * @return  the new pool entry for a new connection
      */
-    protected synchronized
-        BasicPoolEntry createEntry(RouteSpecificPool rospl,
-                                   ClientConnectionOperator op) {
+    protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
+                                         ClientConnectionOperator op) {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating new connection. " + rospl.getRoute());
         }
-        // the entry will create the connection when needed
-        BasicPoolEntry entry =
-            new BasicPoolEntry(op, rospl.getRoute(), refQueue);
-        rospl.createdEntry(entry);
-        numConnections++;
+
+        BasicPoolEntry entry = null;
+        try {
+            poolLock.lock();
+
+            // the entry will create the connection when needed
+            entry = new BasicPoolEntry(op, rospl.getRoute(), refQueue);
+            rospl.createdEntry(entry);
+            numConnections++;
     
-        issuedConnections.add(entry.getWeakRef());
+            issuedConnections.add(entry.getWeakRef());
+
+        } finally {
+            poolLock.unlock();
+        }
 
         return entry;
     }
@@ -385,7 +440,7 @@
      * 
      * @param entry         the pool entry for the connection to delete
      */
-    protected synchronized void deleteEntry(BasicPoolEntry entry) {
+    protected void deleteEntry(BasicPoolEntry entry) {
 
         HttpRoute route = entry.getPlannedRoute();
 
@@ -393,16 +448,23 @@
             LOG.debug("Deleting connection. " + route);
         }
 
-        closeConnection(entry.getConnection());
+        try {
+            poolLock.lock();
 
-        RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
-        rospl.deleteEntry(entry);
-        numConnections--;
-        if (rospl.isUnused()) {
-            routeToPool.remove(route);
-        }
+            closeConnection(entry.getConnection());
+
+            RouteSpecificPool rospl = getRoutePool(route, true);
+            rospl.deleteEntry(entry);
+            numConnections--;
+            if (rospl.isUnused()) {
+                routeToPool.remove(route);
+            }
+
+            idleConnHandler.remove(entry.getConnection());// not idle, but dead
 
-        idleConnHandler.remove(entry.getConnection()); // not idle, but dead
+        } finally {
+            poolLock.unlock();
+        }
     }
 
 
@@ -410,31 +472,45 @@
      * Delete an old, free pool entry to make room for a new one.
      * Used to replace pool entries with ones for a different route.
      */
-    protected synchronized void deleteLeastUsedEntry() {
+    protected void deleteLeastUsedEntry() {
+
+        try {
+            poolLock.lock();
+
+            //@@@ with get() instead of remove, we could
+            //@@@ leave the removing to deleteEntry()
+            BasicPoolEntry entry = freeConnections.remove();
+
+            if (entry != null) {
+                deleteEntry(entry);
+            } else if (LOG.isDebugEnabled()) {
+                LOG.debug("No free connection to delete.");
+            }
 
-        //@@@ with get() instead of remove, we could
-        //@@@ leave the removing to deleteEntry()
-        BasicPoolEntry entry = freeConnections.remove();
-
-        if (entry != null) {
-            deleteEntry(entry);
-        } else if (LOG.isDebugEnabled()) {
-            LOG.debug("No free connection to delete.");
+        } finally {
+            poolLock.unlock();
         }
     }
 
 
     // non-javadoc, see base class AbstractConnPool
-    protected synchronized void handleLostEntry(HttpRoute route) {
+    protected void handleLostEntry(HttpRoute route) {
 
-        RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
-        rospl.dropEntry();
-        if (rospl.isUnused()) {
-            routeToPool.remove(route);
-        }
+        try {
+            poolLock.lock();
+
+            RouteSpecificPool rospl = getRoutePool(route, true);
+            rospl.dropEntry();
+            if (rospl.isUnused()) {
+                routeToPool.remove(route);
+            }
 
-        numConnections--;
-        notifyWaitingThread(rospl);
+            numConnections--;
+            notifyWaitingThread(rospl);
+
+        } finally {
+            poolLock.unlock();
+        }
     }
 
 
@@ -446,7 +522,7 @@
      * 
      * @param rospl     the pool in which to notify, or <code>null</code>
      */
-    protected synchronized void notifyWaitingThread(RouteSpecificPool rospl) {
+    protected void notifyWaitingThread(RouteSpecificPool rospl) {
 
         //@@@ while this strategy provides for best connection re-use,
         //@@@ is it fair? only do this if the connection is open?
@@ -455,28 +531,35 @@
         // it from all wait queues before interrupting.
         WaitingThread waitingThread = null;
 
-        if ((rospl != null) && rospl.hasThread()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Notifying thread waiting on pool. "
-                          + rospl.getRoute());
-            }
-            waitingThread = rospl.dequeueThread();
-            waitingThreads.remove(waitingThread);
+        try {
+            poolLock.lock();
 
-        } else if (!waitingThreads.isEmpty()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Notifying thread waiting on any pool.");
+            if ((rospl != null) && rospl.hasThread()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Notifying thread waiting on pool. "
+                              + rospl.getRoute());
+                }
+                waitingThread = rospl.dequeueThread();
+                waitingThreads.remove(waitingThread);
+
+            } else if (!waitingThreads.isEmpty()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Notifying thread waiting on any pool.");
+                }
+                waitingThread = waitingThreads.remove();
+                waitingThread.pool.removeThread(waitingThread);
+
+            } else if (LOG.isDebugEnabled()) {
+                LOG.debug("Notifying no-one, there are no waiting threads");
             }
-            waitingThread = waitingThreads.remove();
-            waitingThread.pool.removeThread(waitingThread);
 
-        } else if (LOG.isDebugEnabled()) {
-            LOG.debug("Notifying no-one, there are no waiting threads");
-        }
+            if (waitingThread != null) {
+                waitingThread.interruptedByConnectionPool = true;
+                waitingThread.thread.interrupt();
+            }
 
-        if (waitingThread != null) {
-            waitingThread.interruptedByConnectionPool = true;
-            waitingThread.thread.interrupt();
+        } finally {
+            poolLock.unlock();
         }
     }
 
@@ -484,44 +567,57 @@
     //@@@ revise this cleanup stuff
     //@@@ move method to base class when deleteEntry() is fixed
     // non-javadoc, see base class AbstractConnPool
-    public synchronized void deleteClosedConnections() {
+    public void deleteClosedConnections() {
 
-        Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
-        while (iter.hasNext()) {
-            BasicPoolEntry entry = iter.next();
-            if (!entry.getConnection().isOpen()) {
-                iter.remove();
-                deleteEntry(entry);
+        try {
+            poolLock.lock();
+
+            Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
+            while (iter.hasNext()) {
+                BasicPoolEntry entry = iter.next();
+                if (!entry.getConnection().isOpen()) {
+                    iter.remove();
+                    deleteEntry(entry);
+                }
             }
+
+        } finally {
+            poolLock.unlock();
         }
     }
 
 
     // non-javadoc, see base class AbstractConnPool
-    public synchronized void shutdown() {
+    public void shutdown() {
 
-        super.shutdown();
+        try {
+            poolLock.lock();
 
-        // close all free connections
-        //@@@ move this to base class?
-        Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
-        while (ibpe.hasNext()) {
-            BasicPoolEntry entry = ibpe.next();
-            ibpe.remove();
-            closeConnection(entry.getConnection());
-        }
+            super.shutdown();
 
-            
-        // interrupt all waiting threads
-        Iterator<WaitingThread> iwth = waitingThreads.iterator();
-        while (iwth.hasNext()) {
-            WaitingThread waiter = iwth.next();
-            iwth.remove();
-            waiter.interruptedByConnectionPool = true;
-            waiter.thread.interrupt();
-        }
+            // close all free connections
+            //@@@ move this to base class?
+            Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
+            while (ibpe.hasNext()) {
+                BasicPoolEntry entry = ibpe.next();
+                ibpe.remove();
+                closeConnection(entry.getConnection());
+            }
+
+            // interrupt all waiting threads
+            Iterator<WaitingThread> iwth = waitingThreads.iterator();
+            while (iwth.hasNext()) {
+                WaitingThread waiter = iwth.next();
+                iwth.remove();
+                waiter.interruptedByConnectionPool = true;
+                waiter.thread.interrupt();
+            }
+
+            routeToPool.clear();
 
-        routeToPool.clear();
+        } finally {
+            poolLock.unlock();
+        }
     }
 
 

Added: httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java?rev=607287&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
(added)
+++ httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
Fri Dec 28 08:59:17 2007
@@ -0,0 +1,183 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.impl.conn.tsccm;
+
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+
+
+/**
+ * Represents a thread waiting for a connection.
+ * This class implements throwaway objects. It is instantiated whenever
+ * a thread needs to wait. Instances are not re-used, except if the
+ * waiting thread experiences a spurious wakeup and continues to wait.
+ * <br/>
+ * All methods assume external synchronization on the condition
+ * passed to the constructor.
+ * Instances of this class do <i>not</i> synchronize access!
+ *
+ * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
+ */
+public class WaitingThread {
+
+    /** The condition on which the thread is waiting. */
+    private final Condition cond;
+
+    /** The route specific pool on which the thread is waiting. */
+    //@@@ replace with generic pool interface
+    private final RouteSpecificPool pool;
+
+    /** The thread that is waiting for an entry. */
+    private Thread waiter;
+
+
+    /**
+     * Indicates the source of an interruption.
+     * Set to <code>true</code> inside
+     * {@link #notifyWaitingThread(RouteSpecificPool)}
+     * and {@link #shutdown shutdown()}
+     * before the thread is interrupted.
+     * If not set, the thread was interrupted from the outside.
+     */
+    private boolean interruptedByConnectionPool;
+
+
+    /**
+     * Creates a new entry for a waiting thread.
+     *
+     * @param cond      the condition for which to wait
+     * @param pool      the pool on which the thread will be waiting,
+     *                  or <code>null</code>
+     */
+    public WaitingThread(Condition cond, RouteSpecificPool pool) {
+
+        if (cond == null) {
+            throw new IllegalArgumentException("Condition must not be null.");
+        }
+
+        this.cond = cond;
+        this.pool = pool;
+    }
+
+
+    /**
+     * Blocks the calling thread.
+     * This method returns when the thread is notified or interrupted,
+     * if a timeout occurrs, or if there is a spurious wakeup.
+     * <br/>
+     * This method assumes external synchronization.
+     *
+     * @param timeout   the timeout in milliseconds, or 0 for no timeout
+     *
+     * @see #wakeup
+     */
+    public void await(int timeout)
+        throws InterruptedException {
+
+        //@@@ check timeout for negative, or assume overflow?
+
+        // This is only a sanity check. We cannot not synchronize here,
+        // the lock would not be released on calling cond.await() below.
+        if (this.waiter != null) {
+            throw new IllegalStateException
+                ("A thread is already waiting on this object." +
+                 "\ncaller: " + Thread.currentThread() +
+                 "\nwaiter: " + this.waiter);
+        }
+
+        this.waiter = Thread.currentThread();
+
+        try {
+            //@@@ how to convert the int timeout to the long argument?
+            //@@@ (timeout & 0xffffffffL)? or check for negative above?
+            this.cond.await(timeout, TimeUnit.MILLISECONDS);
+        } finally {
+            this.waiter = null;
+        }
+    } // await
+
+
+    /**
+     * Wakes up the waiting thread.
+     * <br/>
+     * This method assumes external synchronization.
+     */
+    public void wakeup() {
+
+        // If external synchronization and pooling works properly,
+        // this cannot happen. Just a sanity check.
+        if (this.waiter == null) {
+            throw new IllegalStateException
+                ("Nobody waiting on this object.");
+        }
+
+        // One condition might be shared by several WaitingThread instances.
+        // It probably isn't, but just in case: wake all, not just one.
+        this.cond.signalAll();
+    }
+
+
+    /**
+     * Obtains the condition.
+     *
+     * @return  the condition on which to wait, never <code>null</code>
+     */
+    public final Condition getCondition() {
+        // not synchronized
+        return this.cond;
+    }
+
+
+    /**
+     * Obtains the pool, if there is one.
+     *
+     * @return  the pool on which a thread is or was waiting,
+     *          or <code>null</code>
+     */
+    public final RouteSpecificPool getPool() {
+        // not synchronized
+        return this.pool;
+    }
+
+
+    /**
+     * Obtains the thread, if there is one.
+     *
+     * @return  the thread which is waiting, or <code>null</code>
+     */
+    public final Thread getThread() {
+        // not synchronized
+        return this.waiter;
+    }
+
+
+} // class WaitingThread

Propchange: httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message