commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbe...@apache.org
Subject cvs commit: jakarta-commons/httpclient/src/java/org/apache/commons/httpclient MultiThreadedHttpConnectionManager.java
Date Sun, 13 Apr 2003 03:51:39 GMT
mbecke      2003/04/12 20:51:39

  Modified:    httpclient/src/test/org/apache/commons/httpclient
                        TestHttpConnectionManager.java
               httpclient/src/java/org/apache/commons/httpclient
                        MultiThreadedHttpConnectionManager.java
  Log:
  Adds support for a maximum number of connections.  Unused connections are now reclaimed.
  
  PR: 18596
  Submitted by: Carl Dunham and Michael Becke
  Reviewed by: Oleg Kalnichevski and Jeff Dever
  
  Revision  Changes    Path
  1.7       +153 -4    jakarta-commons/httpclient/src/test/org/apache/commons/httpclient/TestHttpConnectionManager.java
  
  Index: TestHttpConnectionManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons/httpclient/src/test/org/apache/commons/httpclient/TestHttpConnectionManager.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- TestHttpConnectionManager.java	5 Mar 2003 04:02:56 -0000	1.6
  +++ TestHttpConnectionManager.java	13 Apr 2003 03:51:39 -0000	1.7
  @@ -221,6 +221,124 @@
   
       }
       
  +    /**
  +     * Tests the MultiThreadedHttpConnectionManager's ability to reclaim unused 
  +     * connections.
  +     */
  +    public void testConnectionReclaiming() {
  +        
  +        MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
  +        connectionManager.setMaxConnectionsPerHost(1);
  +        connectionManager.setMaxTotalConnections(1);
  +
  +        HostConfiguration host1 = new HostConfiguration();
  +        host1.setHost("host1", -1, "http");
  +
  +        HostConfiguration host2 = new HostConfiguration();
  +        host2.setHost("host2", -1, "http");
  +
  +        HttpConnection connection = connectionManager.getConnection(host1);
  +        // now release this connection
  +        connection.releaseConnection();
  +        connection = null;
  +        
  +        try {
  +            // the connection from host1 should be reclaimed
  +            connection = connectionManager.getConnection(host2, 100);
  +        } catch (HttpException e) {
  +            e.printStackTrace();
  +            fail("a httpConnection should have been available: " + e);
  +        }        
  +    }
  +    
  +    /**
  +     * Tests the MultiThreadedHttpConnectionManager's ability to restrict the maximum number

  +     * of connections.
  +     */    
  +    public void testMaxConnections() {
  +        
  +        MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
  +        connectionManager.setMaxConnectionsPerHost(1);
  +        connectionManager.setMaxTotalConnections(2);
  +
  +        HostConfiguration host1 = new HostConfiguration();
  +        host1.setHost("host1", -1, "http");
  +
  +        HostConfiguration host2 = new HostConfiguration();
  +        host2.setHost("host2", -1, "http");
  +
  +        HttpConnection connection1 = connectionManager.getConnection(host1);
  +        HttpConnection connection2 = connectionManager.getConnection(host2);
  +    
  +        try {
  +            // this should fail quickly since the connection has not been released
  +            connectionManager.getConnection(host2, 100);
  +            fail("a httpConnection should not be available");
  +        } catch (HttpException e) {
  +            // this should throw an exception
  +        }
  +        
  +        // release one of the connections
  +        connection2.releaseConnection();
  +        connection2 = null;
  +        
  +        try {
  +            // there should be a connection available now
  +            connection2 = connectionManager.getConnection(host2, 100);
  +        } catch (HttpException e) {
  +            e.printStackTrace();
  +            fail("a httpConnection should have been available: " + e);
  +        }
  +    }    
  +
  +    public void testHostReusePreference() {
  +        
  +        final MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
  +        connectionManager.setMaxConnectionsPerHost(1);
  +        connectionManager.setMaxTotalConnections(1);
  +
  +        final HostConfiguration host1 = new HostConfiguration();
  +        host1.setHost("host1", -1, "http");
  +
  +        final HostConfiguration host2 = new HostConfiguration();
  +        host2.setHost("host2", -1, "http");
  +
  +        HttpConnection connection = connectionManager.getConnection(host1);
  +
  +        GetConnectionThread getHost1 = new GetConnectionThread(host1, connectionManager,
200);
  +        GetConnectionThread getHost2 = new GetConnectionThread(host2, connectionManager,
200);
  +        
  +        getHost2.start();
  +        getHost1.start();
  +        
  +        // give the threads some time to startup
  +        try {
  +            Thread.sleep(100);
  +        } catch (InterruptedException e1) {
  +            e1.printStackTrace();
  +        }
  +            
  +        // after the connection to host1 is released it should be given to getHost1
  +        connection.releaseConnection();
  +        connection = null;
  +
  +        try {
  +            getHost1.join();
  +            getHost2.join();
  +        } catch (InterruptedException e) {
  +            e.printStackTrace();
  +        }
  +
  +        assertNotSame(
  +            "Connection should have been given to someone", 
  +            getHost1.getConnection(),
  +            getHost2.getConnection()
  +        );        
  +        assertNotNull("Connection should have been given to host1", getHost1.getConnection());
  +        assertNull("Connection should NOT have been given to host2", getHost2.getConnection());
  +        
  +    } 
  +    
       public void testMaxConnectionsPerServer() {
        
           MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
  @@ -324,12 +442,43 @@
               
               HttpConnection conn1 = mgr.getConnection(hostConfig);
               HttpConnection conn2 = mgr.getConnection(hostConfig);
  +            
               HttpConnection conn3 = mgr.getConnection(hostConfig, 1000);
               fail("Expected an HttpException.");
               
           }catch(HttpException e){
               //Expected result
           }
  +    }
  +    
  +    static class GetConnectionThread extends Thread {
  +        
  +        private HostConfiguration hostConfiguration;
  +        private MultiThreadedHttpConnectionManager connectionManager;
  +        private HttpConnection connection;
  +        private long timeout;
  +        
  +        public GetConnectionThread(
  +            HostConfiguration hostConfiguration, 
  +            MultiThreadedHttpConnectionManager connectionManager,
  +            long timeout
  +        ) {
  +            this.hostConfiguration = hostConfiguration;
  +            this.connectionManager = connectionManager; 
  +            this.timeout = timeout;
  +        }
  +        
  +        public void run() {
  +            try {
  +                connection = connectionManager.getConnection(hostConfiguration, timeout);
  +            } catch (HttpException e) {
  +            }            
  +        }
  +        
  +        public HttpConnection getConnection() {
  +            return connection;
  +        }
  +
       }
       
   }
  
  
  
  1.14      +377 -180  jakarta-commons/httpclient/src/java/org/apache/commons/httpclient/MultiThreadedHttpConnectionManager.java
  
  Index: MultiThreadedHttpConnectionManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons/httpclient/src/java/org/apache/commons/httpclient/MultiThreadedHttpConnectionManager.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- MultiThreadedHttpConnectionManager.java	9 Apr 2003 18:37:59 -0000	1.13
  +++ MultiThreadedHttpConnectionManager.java	13 Apr 2003 03:51:39 -0000	1.14
  @@ -72,6 +72,7 @@
   import java.net.SocketException;
   import java.util.Collections;
   import java.util.HashMap;
  +import java.util.Iterator;
   import java.util.LinkedList;
   import java.util.Map;
   
  @@ -85,6 +86,7 @@
    * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
    * @author Eric Johnson
    * @author <a href="mailto:mbowler@GargoyleSoftware.com">Mike Bowler</a>
  + * @author Carl A. Dunham
    *
    * @since 2.0
    */
  @@ -95,14 +97,15 @@
       private static final Log LOG = LogFactory.getLog(MultiThreadedHttpConnectionManager.class);
   
       // ----------------------------------------------------- Instance Variables
  -    /**
  -     * Map where keys are {@link HostConfiguration}s and values are {@link
  -     * HostConnectionPool}s
  -     */
  -    private final Map mapHosts = new HashMap();
   
  -    /** Maximum number of connections allowed */
  -    private int maxConnections = 2;   // Per RFC 2616 sec 8.1.4
  +    /** Maximum number of connections allowed per host */
  +    private int maxHostConnections = 2;   // Per RFC 2616 sec 8.1.4
  +
  +    /** Maximum number of connections allowed overall */
  +    private int maxTotalConnections = 20;
  +
  +    /** Connection Pool */
  +    private ConnectionPool connectionPool;
   
       /** mapping from reference to hostConfiguration */
       private Map referenceToHostConfig;
  @@ -119,6 +122,8 @@
       public MultiThreadedHttpConnectionManager() {
   
           this.referenceToHostConfig = Collections.synchronizedMap(new HashMap());
  +        this.connectionPool = new ConnectionPool();
  +        
           this.referenceQueue = new ReferenceQueue();
   
           new ReferenceQueueThread().start();
  @@ -129,11 +134,11 @@
        * Sets the maximum number of connections allowed for a given
        * HostConfiguration. Per RFC 2616 section 8.1.4, this value defaults to 2.
        *
  -     * @param maxConnections the number of connections allowed for each
  +     * @param maxHostConnections the number of connections allowed for each
        * hostConfiguration
        */
  -    public void setMaxConnectionsPerHost(int maxConnections) {
  -        this.maxConnections = maxConnections;
  +    public void setMaxConnectionsPerHost(int maxHostConnections) {
  +        this.maxHostConnections = maxHostConnections;
       }
   
       /**
  @@ -144,7 +149,25 @@
        * hostConfiguration.
        */
       public int getMaxConnectionsPerHost() {
  -        return maxConnections;
  +        return maxHostConnections;
  +    }
  +
  +    /**
  +     * Sets the maximum number of connections allowed in the system.
  +     *
  +     * @param maxTotalConnections the maximum number of connections allowed
  +     */
  +    public void setMaxTotalConnections(int maxTotalConnections) {
  +        this.maxTotalConnections = maxTotalConnections;
  +    }
  +
  +    /**
  +     * Gets the maximum number of connections allowed in the system.
  +     *
  +     * @return The maximum number of connections allowed
  +     */
  +    public int getMaxTotalConnections() {
  +        return maxTotalConnections;
       }
   
       /**
  @@ -156,13 +179,15 @@
               try {
                   return getConnection(hostConfiguration, 0);
               } catch (HttpException e) {
  +                // we'll go ahead and log this, but it should never happen. HttpExceptions
  +                // are only thrown when the timeout occurs and since we have no timeout
  +                // it should never happen.
                   LOG.debug(
                       "Unexpected exception while waiting for connection",
                       e
                   );
               };
           }
  -
       }
   
       /**
  @@ -182,12 +207,7 @@
                   + hostConfiguration + ", timeout = " + timeout);
           }
   
  -        // we get the connection pool with a clone of the hostConfiguration
  -        // so that it cannot be changed once the connecton has been retrieved
  -        final HttpConnection conn 
  -            = getConnection(getConnectionPool(new HostConfiguration(hostConfiguration)),
  -            hostConfiguration, timeout
  -        );
  +        final HttpConnection conn = doGetConnection(hostConfiguration, timeout);
   
           // wrap the connection in an adapter so we can ensure it is used 
           // only once
  @@ -197,9 +217,9 @@
       /**
        * Gets a connection or waits if one is not available.  A connection is
        * available if one exists that is not being used or if fewer than
  -     * maxConnections have been created in the connectionPool.
  +     * maxHostConnections have been created in the connectionPool, and fewer
  +     * than maxTotalConnections have been created in all connectionPools.
        *
  -     * @param connectionPool The connection pool to use.
        * @param hostConfiguration The host configuration.
        * @param timeout the number of milliseconds to wait for a connection, 0 to
        * wait indefinitely
  @@ -209,97 +229,118 @@
        * @throws HttpException if a connection does not become available in
        * 'timeout' milliseconds
        */
  -    private HttpConnection getConnection(HostConnectionPool connectionPool,
  -        HostConfiguration hostConfiguration, long timeout) throws HttpException {
  +    private HttpConnection doGetConnection(HostConfiguration hostConfiguration, 
  +        long timeout) throws HttpException {
   
           HttpConnection connection = null;
   
           synchronized (connectionPool) {
   
  -            // keep trying until a connection is available, should happen at
  -            // most twice
  +            // we clone the hostConfiguration
  +            // so that it cannot be changed once the connection has been retrieved
  +            hostConfiguration = new HostConfiguration(hostConfiguration);
  +            HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration);
  +            WaitingThread waitingThread = null;
  +
  +            boolean useTimeout = (timeout > 0);
  +            long timeToWait = timeout;
  +            long startWait = 0;
  +            long endWait = 0;
  +
               while (connection == null) {
   
  -                if (connectionPool.freeConnections.size() > 0) {
  -                    connection = (HttpConnection) connectionPool
  -                        .freeConnections.removeFirst();
  +                // happen to have a free connection with the right specs
  +                //
  +                if (hostPool.freeConnections.size() > 0) {
  +                    connection = connectionPool.getFreeConnection(hostConfiguration);
  +
  +                // have room to make more
  +                //
  +                } else if ((hostPool.numConnections < maxHostConnections) 
  +                    && (connectionPool.numConnections < maxTotalConnections))
{
  +
  +                    connection = connectionPool.createConnection(hostConfiguration);
  +
  +                // have room to add host connection, and there is at least one free
  +                // connection that can be liberated to make overall room
  +                //
  +                } else if ((hostPool.numConnections < maxHostConnections) 
  +                    && (connectionPool.freeConnections.size() > 0)) {
  +
  +                    connectionPool.deleteLeastUsedConnection();
  +                    connection = connectionPool.createConnection(hostConfiguration);
  +
  +                // otherwise, we have to wait for one of the above conditions to
  +                // become true
  +                //
                   } else {
  -                    // get number of connections hostConfig
  -                    if (connectionPool.numConnections < maxConnections) {
  -                        // Create a new connection
  -                        connection = new HttpConnection(hostConfiguration);
  -                        connection.setHttpConnectionManager(this);
  -                        connectionPool.numConnections++;
  -
  -                        // add a weak reference to this connection
  -                        referenceToHostConfig.put(new WeakReference(connection, referenceQueue),
  -                            hostConfiguration);
  -
  -                    } else {
  -
  -                        TimeoutThread threadTimeout = new TimeoutThread();
  -                        threadTimeout.setTimeout(timeout);
  -                        threadTimeout.setWakeupThread(Thread.currentThread());
  -                        threadTimeout.start();
  -
  -                        try {
  -                            LOG.debug(
  -                                "HttpConnectionManager.getConnection:  waiting for "
  -                                + "connection from " + connectionPool
  -                            );
  -                            connectionPool.wait();
  -                            // we were woken up before the timeout occurred, so
  -                            // there should be a connection available
  -                            threadTimeout.interrupt();
  -                        } catch (InterruptedException e) {
  -                            throw new HttpException("Timeout waiting for connection.");
  -                        }
  +                    // todo: keep track of which hostConfigurations have waiting
  +                    // threads, so they avoid being sacrificed before necessary
   
  +                    try {
  +                        
  +                        if (useTimeout && timeToWait <= 0) {
  +                            throw new HttpException("Timeout waiting for connection");
  +                        }
  +                        
  +                        if (LOG.isDebugEnabled()) {
  +                            LOG.debug("Waiting for a connection ");
  +                        }
  +                        
  +                        if (waitingThread == null) {
  +                            waitingThread = new WaitingThread();
  +                            waitingThread.hostConnectionPool = hostPool;
  +                            waitingThread.thread = Thread.currentThread();
  +                        }
  +                                    
  +                        if (useTimeout) {
  +                            startWait = System.currentTimeMillis();
  +                        }
  +                        
  +                        hostPool.waitingThreads.addLast(waitingThread);
  +                        connectionPool.waitingThreads.addLast(waitingThread);
  +                        connectionPool.wait(timeToWait);
  +                        
  +                        // we have not been interrupted so we need to remove ourselves
from the 
  +                        // wait queue                      
  +                        hostPool.waitingThreads.remove(waitingThread);            
  +                        connectionPool.waitingThreads.remove(waitingThread);          
 
  +                    } catch (InterruptedException e) {
  +                        // do nothing
  +                    } finally {
  +                        if (useTimeout) {
  +                            endWait = System.currentTimeMillis();
  +                            timeToWait -= (endWait - startWait);
  +                        }
                       }
                   }
               }
  -
           }
  -
           return connection;
       }
   
       /**
  -     * Get the pool (list) of connections available for the given hostConfig.
  +     * Gets the number of connections in use for this configuration.
        *
  -     * @param hostConfiguration the configuraton for the connection pool
  -     * @return a pool (list) of connections available for the given config
  +     * @param hostConfiguration the key that connections are tracked on
  +     * @return the number of connections in use
        */
  -    private HostConnectionPool getConnectionPool(HostConfiguration hostConfiguration) {
  -        LOG.trace("enter HttpConnectionManager.getConnections(String)");
  -
  -        // Look for a list of connections for the given config
  -        HostConnectionPool listConnections = null;
  -        synchronized (mapHosts) {
  -            listConnections = (HostConnectionPool) mapHosts.get(hostConfiguration);
  -            if (listConnections == null) {
  -                // First time for this config
  -                listConnections = new HostConnectionPool();
  -                mapHosts.put(hostConfiguration, listConnections);
  -            }
  +    public int getConnectionsInUse(HostConfiguration hostConfiguration) {
  +        synchronized (connectionPool) {
  +            HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration);
  +            return hostPool.numConnections;
           }
  -        return listConnections;
       }
   
       /**
  -     * Get the number of connections in use for this configuration.
  -     *
  -     * @param hostConfiguration the key that connections are tracked on
  -     * @return the number of connections in use
  +     * Gets the total number of connections in use.
  +     * 
  +     * @return the total number of connections in use
        */
  -    public int getConnectionsInUse(HostConfiguration hostConfiguration) {
  -        LOG.trace("enter HttpConnectionManager.getConnectionsInUse(String)");
  -
  -        HostConnectionPool connectionPool = getConnectionPool(hostConfiguration);
  +    public int getConnectionsInUse() {
           synchronized (connectionPool) {
               return connectionPool.numConnections;
           }
  -
       }
   
       /**
  @@ -323,6 +364,16 @@
           // make sure that the response has been read.
           SimpleHttpConnectionManager.finishLastResponse(conn);
   
  +        connectionPool.freeConnection(conn);
  +    }
  +
  +    /**
  +     * Gets the host configuration for a connection.
  +     * @param conn the connection to get the configuration of
  +     * @return a new HostConfiguration
  +     */
  +    private HostConfiguration configurationForConnection(HttpConnection conn) {
  +
           HostConfiguration connectionConfiguration = new HostConfiguration();
           connectionConfiguration.setHost(
               conn.getHost(), 
  @@ -333,148 +384,294 @@
               connectionConfiguration.setProxy(conn.getProxyHost(), conn.getProxyPort());
           }
   
  -        if (LOG.isDebugEnabled()) {
  -            LOG.debug("HttpConnectionManager.releaseConnection:  Release connection for
" 
  -                + connectionConfiguration);
  -        }
  -
  -        final HostConnectionPool listConnections = getConnectionPool(connectionConfiguration);
  -        synchronized (listConnections) {
  -            // Put the connect back in the available list and notify a waiter
  -            listConnections.freeConnections.addFirst(conn);
  -            if (listConnections.numConnections == 0) {
  -                // for some reason this connection pool didn't already exist
  -                LOG.error("connection pool not found for: " 
  -                    + connectionConfiguration);
  -                listConnections.numConnections = 1;
  -            }
  -            listConnections.notify();
  -        }
  +        return connectionConfiguration;
       }
  +    
   
       /**
  -     * A simple struct-link class to combine the connection list and the count
  -     * of created connections.
  +     * Global Connection Pool, including per-host pools
        */
  -    private class HostConnectionPool {
  +    private class ConnectionPool {
  +        
           /** The list of free connections */
           private LinkedList freeConnections = new LinkedList();
   
  +        /** The list of WaitingThreads waiting for a connection */
  +        private LinkedList waitingThreads = new LinkedList();
  +
  +        /**
  +         * Map where keys are {@link HostConfiguration}s and values are {@link
  +         * HostConnectionPool}s
  +         */
  +        private final Map mapHosts = new HashMap();
  +
           /** The number of created connections */
           private int numConnections = 0;
  -    }
   
  -    /**
  -     * A thread for listening for HttpConnections reclaimed by the garbage
  -     * collector.
  -     */
  -    private class ReferenceQueueThread extends Thread {
  +        /**
  +         * Creates a new connection and returns is for use of the calling method.
  +         *
  +         * @param hostConfiguration the configuration for the connection
  +         * @return a new connection or <code>null</code> if none are available
  +         */
  +        public synchronized HttpConnection createConnection(HostConfiguration hostConfiguration)
{
  +            HttpConnection connection = null;
   
  +            HostConnectionPool hostPool = getHostPool(hostConfiguration);
  +
  +            if ((hostPool.numConnections < getMaxConnectionsPerHost()) 
  +                && (numConnections < getMaxTotalConnections())) {
  +
  +                connection = new HttpConnection(hostConfiguration);
  +                connection.setHttpConnectionManager(MultiThreadedHttpConnectionManager.this);
  +                numConnections++;
  +                hostPool.numConnections++;
  +        
  +                // add a weak reference to this connection
  +                referenceToHostConfig.put(new WeakReference(connection, referenceQueue),
  +                                          hostConfiguration);
  +            }
  +            return connection;
  +        }
  +    
           /**
  -         * Create an instance and make this a daemon thread.
  +         * Get the pool (list) of connections available for the given hostConfig.
  +         *
  +         * @param hostConfiguration the configuraton for the connection pool
  +         * @return a pool (list) of connections available for the given config
            */
  -        public ReferenceQueueThread() {
  -            setDaemon(true);
  +        public synchronized HostConnectionPool getHostPool(HostConfiguration hostConfiguration)
{
  +            LOG.trace("enter HttpConnectionManager.ConnectionPool.getHostPool(HostConfiguration)");
  +
  +            // Look for a list of connections for the given config
  +            HostConnectionPool listConnections = (HostConnectionPool) 
  +                mapHosts.get(hostConfiguration);
  +            if (listConnections == null) {
  +                // First time for this config
  +                listConnections = new HostConnectionPool();
  +                mapHosts.put(hostConfiguration, listConnections);
  +            }
  +            
  +            return listConnections;
           }
   
           /**
  -         * Start execution.
  +         * If available, get a free connection for this host
  +         *
  +         * @param hostConfiguration the configuraton for the connection pool
  +         * @return an available connection for the given config
            */
  -        public void run() {
  +        public synchronized HttpConnection getFreeConnection(HostConfiguration hostConfiguration)
{
   
  -            while (true) {
  +            HttpConnection connection = null;
  +            
  +            HostConnectionPool hostPool = getHostPool(hostConfiguration);
  +
  +            if (hostPool.freeConnections.size() > 0) {
  +                connection = (HttpConnection) hostPool.freeConnections.removeFirst();
  +                freeConnections.remove(connection);
  +            }
  +            return connection;
  +        }
   
  -                try {
  -                    Reference ref = referenceQueue.remove();
  +        /**
  +         * Close and delete an old, unused connection to make room for a new one.
  +         */
  +        public synchronized void deleteLeastUsedConnection() {
   
  -                    if (ref != null) {
  -                        HostConfiguration config = (HostConfiguration)
  -                            referenceToHostConfig.get(ref);
  -                        referenceToHostConfig.remove(ref);
  -                        HostConnectionPool connectionPool = getConnectionPool(config);
  -                        synchronized (connectionPool) {
  -                            connectionPool.numConnections--;
  -                            connectionPool.notify();
  -                        }
  -                    }
  -                } catch (InterruptedException e) {
  -                    LOG.debug("ReferenceQueueThread interrupted", e);
  +            HttpConnection connection = (HttpConnection) freeConnections.removeFirst();
  +
  +            if (connection != null) {
  +                HostConfiguration connectionConfiguration = configurationForConnection(connection);
  +
  +                if (LOG.isDebugEnabled()) {
  +                    LOG.debug("Reclaiming unused connection for hostConfig: " 
  +                        + connectionConfiguration);
                   }
   
  +                connection.close();
  +
  +                // make sure this connection will not be cleaned up again when garbage

  +                // collected
  +                for (Iterator iter = referenceToHostConfig.keySet().iterator(); iter.hasNext();)
{
  +                    WeakReference connectionRef = (WeakReference) iter.next();
  +                    if (connectionRef.get() == connection) {
  +                        iter.remove();
  +                        connectionRef.enqueue();
  +                        break;
  +                    }
  +                }
  +                
  +                HostConnectionPool hostPool = getHostPool(connectionConfiguration);
  +                
  +                hostPool.freeConnections.remove(connection);
  +                hostPool.numConnections--;
  +                numConnections--;
               }
  +        }
   
  +        /**
  +         * Notifies a waiting thread that a connection for the given configuration is 
  +         * available.
  +         * @param configuration the host config to use for notifying
  +         * @see #notifyWaitingThread(HostConnectionPool)
  +         */
  +        public synchronized void notifyWaitingThread(HostConfiguration configuration) {
  +            notifyWaitingThread(getHostPool(configuration));
  +        }
  +
  +        /**
  +         * Notifies a waiting thread that a connection for the given configuration is 
  +         * available.  This will wake a thread witing in tis hostPool or if there is not
  +         * one a thread in the ConnectionPool will be notified.
  +         * 
  +         * @param hostPool the host pool to use for notifying
  +         */
  +        public synchronized void notifyWaitingThread(HostConnectionPool hostPool) {
  +
  +            // find the thread we are going to notify, we want to ensure that each
  +            // waiting thread is only interrupted once so we will remove it from 
  +            // all wait queues before interrupting it
  +            WaitingThread waitingThread = null;
  +                
  +            if (hostPool.waitingThreads.size() > 0) {
  +                if (LOG.isDebugEnabled()) {
  +                    LOG.debug("Notifying thread waiting on hostPool");
  +                }                
  +                waitingThread = (WaitingThread) hostPool.waitingThreads.removeFirst();
  +                waitingThreads.remove(waitingThread);
  +            } else if (waitingThreads.size() > 0) {
  +                if (LOG.isDebugEnabled()) {
  +                    LOG.debug("Notifying next waiting thread");
  +                }
  +                waitingThread = (WaitingThread) waitingThreads.removeFirst();
  +                waitingThread.hostConnectionPool.waitingThreads.remove(waitingThread);
  +            } else if (LOG.isDebugEnabled()) {
  +                LOG.debug("Notifying no-one, there are no waiting threads");
  +            }
  +                
  +            if (waitingThread != null) {
  +                waitingThread.thread.interrupt();
  +            }
           }
   
  +        /**
  +         * Marks the given connection as free.
  +         * @param conn a connection that is no longer being used
  +         */
  +        public void freeConnection(HttpConnection conn) {
  +
  +            HostConfiguration connectionConfiguration = configurationForConnection(conn);
  +
  +            if (LOG.isDebugEnabled()) {
  +                LOG.debug("Freeing connection: " + conn);
  +            }
  +
  +            synchronized (this) {
  +                HostConnectionPool hostPool = getHostPool(connectionConfiguration);
  +
  +                // Put the connect back in the available list and notify a waiter
  +                hostPool.freeConnections.add(conn);
  +                if (hostPool.numConnections == 0) {
  +                    // for some reason this connection pool didn't already exist
  +                    LOG.error("host connection pool not found for: " 
  +                              + connectionConfiguration);
  +                    hostPool.numConnections = 1;
  +                }
  +
  +                freeConnections.add(conn);
  +                if (numConnections == 0) {
  +                    // for some reason this connection pool didn't already exist
  +                    LOG.error("connection pool not found for: " 
  +                              + connectionConfiguration);
  +                    numConnections = 1;
  +                }
  +                
  +                notifyWaitingThread(hostPool);
  +            }
  +
  +        }
       }
   
       /**
  -     * In getConnection, if the maximum number of connections has already been
  -     * reached the call will block.  This class is used to help provide a
  -     * timeout facility for this wait.  Because Java does not provide a way to
  -     * determine if wait() returned due to a notify() or a timeout, we need an
  -     * outside mechanism to interrupt the waiting thread after the specified
  -     * timeout interval.
  +     * A simple struct-like class to combine the connection list and the count
  +     * of created connections.
        */
  -    private static class TimeoutThread extends Thread {
  +    private class HostConnectionPool {
  +        /** The list of free connections */
  +        public LinkedList freeConnections = new LinkedList();
  +        
  +        /** The list of WaitingThreads for this host */
  +        public LinkedList waitingThreads = new LinkedList();
   
  -        /** The timeout in milliseconds. */
  -        private long timeout = 0;
  +        /** The number of created connections */
  +        public int numConnections = 0;
  +    }
  +    
  +    /**
  +     * A simple struct-like class to combine the waiting thread and the connection 
  +     * pool it is waiting on.
  +     */
  +    private class WaitingThread {
  +        /** The thread that is waiting for a connection */
  +        public Thread thread;
  +        
  +        /** The connection pool the thread is waiting for */
  +        public HostConnectionPool hostConnectionPool;
  +    }
   
  -        /** The thread that will be woken up after the specified timeout. */
  -        private Thread wakeupThread = null;
  +    /**
  +     * A thread for listening for HttpConnections reclaimed by the garbage
  +     * collector.
  +     */
  +    private class ReferenceQueueThread extends Thread {
   
           /**
  -         * Set the timeout
  -         * @param timeout The timeout in milliseconds.
  +         * Create an instance and make this a daemon thread.
            */
  -        public void setTimeout(long timeout) {
  -            this.timeout = timeout;
  +        public ReferenceQueueThread() {
  +            setDaemon(true);
           }
   
           /**
  -         * Return the timeout value in milliseconds.
  -         * @return long The timeout.
  +         * Handles cleaning up for the given reference.  Decrements any connection counts
  +         * and notifies waiting threads, if appropriate.
  +         * 
  +         * @param ref the reference to clean up
            */
  -        public long getTimeout() {
  -            return timeout;
  -        }
  +        private void handleReference(Reference ref) {
  +            synchronized (connectionPool) {
  +                // only clean up for this reference if it is still associated with 
  +                // a HostConfiguration
  +                if (referenceToHostConfig.containsKey(ref)) {
  +                    HostConfiguration config = (HostConfiguration) referenceToHostConfig.get(ref);
  +                    referenceToHostConfig.remove(ref);
   
  -        /**
  -         * Set the thread that will be woken up after the specified timeout.
  -         * @param newWakeupThread The thread to be woken.
  -         */
  -        public void setWakeupThread(Thread newWakeupThread) {
  -            this.wakeupThread = newWakeupThread;
  -        }
  +                    HostConnectionPool hostPool = connectionPool.getHostPool(config);
  +                    hostPool.numConnections--;
   
  -        /**
  -         * Return the thread that will be woken up after the specified timeout.
  -         * @return Thread The thread to be woken.
  -         */
  -        public Thread getWakeupThread() {
  -            return wakeupThread;
  +                    connectionPool.numConnections--;
  +                    connectionPool.notifyWaitingThread(config);
  +                }
  +            }            
           }
   
           /**
            * Start execution.
            */
           public void run() {
  -            LOG.trace("TimeoutThread.run()");
  -            if (timeout == 0) {
  -                return;
  -            }
  -            if (wakeupThread == null) {
  -                return;
  -            }
  -
  -            try {
  -                sleep(timeout);
  -                wakeupThread.interrupt();
  -            } catch (InterruptedException e) {
  -            LOG.debug("InterruptedException caught as expected");
  -                // This is expected
  +            while (true) {
  +                try {
  +                    Reference ref = referenceQueue.remove();
  +                    if (ref != null) {
  +                        handleReference(ref);
  +                    }
  +                } catch (InterruptedException e) {
  +                    LOG.debug("ReferenceQueueThread interrupted", e);
  +                }
               }
           }
  +
       }
   
       /**
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org


Mime
View raw message