commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1102868 - /commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
Date Fri, 13 May 2011 19:05:20 GMT
Author: markt
Date: Fri May 13 19:05:20 2011
New Revision: 1102868

URL: http://svn.apache.org/viewvc?rev=1102868&view=rev
Log:
Add the ability to register an interest in a key to prevent the associated ObjectDeque being
removed from under methods that expect it to remain available for the life of the method.
Fixes the issues found by commons-performance

Modified:
    commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java

Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java?rev=1102868&r1=1102867&r2=1102868&view=diff
==============================================================================
--- commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
(original)
+++ commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
Fri May 13 19:05:20 2011
@@ -28,6 +28,10 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.pool2.BaseKeyedObjectPool;
 import org.apache.commons.pool2.KeyedPoolableObjectFactory;
@@ -1025,103 +1029,106 @@ public class GenericKeyedObjectPool<K,T>
         long maxWait = _maxWait;
 
         boolean create;
-        ObjectDeque<T> objectDeque = poolMap.get(key);
+        ObjectDeque<T> objectDeque = register(key);
         
-        while (p == null) {
-            create = false;
-            if (whenExhaustedAction == WhenExhaustedAction.FAIL) {
-                if (objectDeque != null) {
-                    p = objectDeque.getIdleObjects().pollFirst();
-                }
-                if (p == null) {
-                    create = true;
-                    p = create(key, false);
-                }
-                if (p == null) {
-                    throw new NoSuchElementException("Pool exhausted");
-                }
-                if (!p.allocate()) {
-                    p = null;
-                }
-            } else if (whenExhaustedAction == WhenExhaustedAction.BLOCK) {
-                if (objectDeque != null) {
-                    p = objectDeque.getIdleObjects().pollFirst();
-                }
-                if (p == null) {
-                    create = true;
-                    p = create(key, false);
-                }
-                if (p == null && objectDeque != null) {
-                    if (maxWait < 1) {
-                        p = objectDeque.getIdleObjects().takeFirst();
-                    } else {
-                        p = objectDeque.getIdleObjects().pollFirst(maxWait,
-                                TimeUnit.MILLISECONDS);
+        try {
+            while (p == null) {
+                create = false;
+                if (whenExhaustedAction == WhenExhaustedAction.FAIL) {
+                    if (objectDeque != null) {
+                        p = objectDeque.getIdleObjects().pollFirst();
                     }
-                }
-                if (p == null) {
-                    throw new NoSuchElementException(
-                            "Timeout waiting for idle object");
-                }
-                if (!p.allocate()) {
-                    p = null;
-                }
-            } else if (whenExhaustedAction == WhenExhaustedAction.GROW) {
-                if (objectDeque != null) {
-                    p = objectDeque.getIdleObjects().pollFirst();
-                }
-                if (p == null) {
-                    create = true;
-                    p = create(key, true);
-                }
-                if (p != null && !p.allocate()) {
-                    p = null;
-                }
-            }
-
-            if (p != null) {
-                try {
-                    _factory.activateObject(key, p.getObject());
-                } catch (Exception e) {
-                    try {
-                        destroy(key, p);
-                    } catch (Exception e1) {
-                        // Ignore - activation failure is more important
+                    if (p == null) {
+                        create = true;
+                        p = create(key, false);
+                    }
+                    if (p == null) {
+                        throw new NoSuchElementException("Pool exhausted");
+                    }
+                    if (!p.allocate()) {
+                        p = null;
+                    }
+                } else if (whenExhaustedAction == WhenExhaustedAction.BLOCK) {
+                    if (objectDeque != null) {
+                        p = objectDeque.getIdleObjects().pollFirst();
+                    }
+                    if (p == null) {
+                        create = true;
+                        p = create(key, false);
+                    }
+                    if (p == null && objectDeque != null) {
+                        if (maxWait < 1) {
+                            p = objectDeque.getIdleObjects().takeFirst();
+                        } else {
+                            p = objectDeque.getIdleObjects().pollFirst(maxWait,
+                                    TimeUnit.MILLISECONDS);
+                        }
+                    }
+                    if (p == null) {
+                        throw new NoSuchElementException(
+                                "Timeout waiting for idle object");
+                    }
+                    if (!p.allocate()) {
+                        p = null;
+                    }
+                } else if (whenExhaustedAction == WhenExhaustedAction.GROW) {
+                    if (objectDeque != null) {
+                        p = objectDeque.getIdleObjects().pollFirst();
+                    }
+                    if (p == null) {
+                        create = true;
+                        p = create(key, true);
                     }
-                    p = null;
-                    if (create) {
-                        NoSuchElementException nsee = new NoSuchElementException(
-                                "Unable to activate object");
-                        nsee.initCause(e);
-                        throw nsee;
+                    if (p != null && !p.allocate()) {
+                        p = null;
                     }
                 }
-                if (p != null && getTestOnBorrow()) {
-                    boolean validate = false;
-                    Throwable validationThrowable = null;
+    
+                if (p != null) {
                     try {
-                        validate = _factory.validateObject(key, p.getObject());
-                    } catch (Throwable t) {
-                        PoolUtils.checkRethrow(t);
-                    }
-                    if (!validate) {
+                        _factory.activateObject(key, p.getObject());
+                    } catch (Exception e) {
                         try {
                             destroy(key, p);
-                        } catch (Exception e) {
-                            // Ignore - validation failure is more important
+                        } catch (Exception e1) {
+                            // Ignore - activation failure is more important
                         }
                         p = null;
                         if (create) {
                             NoSuchElementException nsee = new NoSuchElementException(
-                                    "Unable to validate object");
-                            nsee.initCause(validationThrowable);
+                                    "Unable to activate object");
+                            nsee.initCause(e);
                             throw nsee;
                         }
                     }
+                    if (p != null && getTestOnBorrow()) {
+                        boolean validate = false;
+                        Throwable validationThrowable = null;
+                        try {
+                            validate = _factory.validateObject(key, p.getObject());
+                        } catch (Throwable t) {
+                            PoolUtils.checkRethrow(t);
+                        }
+                        if (!validate) {
+                            try {
+                                destroy(key, p);
+                            } catch (Exception e) {
+                                // Ignore - validation failure is more important
+                            }
+                            p = null;
+                            if (create) {
+                                NoSuchElementException nsee = new NoSuchElementException(
+                                        "Unable to validate object");
+                                nsee.initCause(validationThrowable);
+                                throw nsee;
+                            }
+                        }
+                    }
                 }
             }
+        } finally {
+            deregister(key);
         }
-
         return p.getObject();
     }
 
@@ -1258,22 +1265,28 @@ public class GenericKeyedObjectPool<K,T>
      @Override
      public void clear(K key) {
          
-         ObjectDeque<T> objectDeque = poolMap.get(key);
-         if (objectDeque == null) {
-             return;
-         }
-         LinkedBlockingDeque<PooledObject<T>> idleObjects =
-                 objectDeque.getIdleObjects();
+         register(key);
          
-         PooledObject<T> p = idleObjects.poll();
-
-         while (p != null) {
-             try {
-                 destroy(key, p);
-             } catch (Exception e) {
-                 // TODO - Ignore?
+         try {
+             ObjectDeque<T> objectDeque = poolMap.get(key);
+             if (objectDeque == null) {
+                 return;
+             }
+             LinkedBlockingDeque<PooledObject<T>> idleObjects =
+                     objectDeque.getIdleObjects();
+             
+             PooledObject<T> p = idleObjects.poll();
+    
+             while (p != null) {
+                 try {
+                     destroy(key, p);
+                 } catch (Exception e) {
+                     // TODO - Ignore?
+                 }
+                 p = idleObjects.poll();
              }
-             p = idleObjects.poll();
+         } finally {
+             deregister(key);
          }
      }
 
@@ -1563,26 +1576,11 @@ public class GenericKeyedObjectPool<K,T>
             }
         }
          
-        // Make sure the key exists in the poolMap
-        ObjectDeque<T> objectDeque;
-        int newNumActive;
-        synchronized (poolMap) {
-            // This all has to be in the sync block to ensure that the key is
-            // not removed by destroy
-            objectDeque = poolMap.get(key);
-            if (objectDeque == null) {
-                objectDeque = new ObjectDeque<T>();
-                newNumActive = objectDeque.getNumActive().incrementAndGet();
-                poolMap.put(key, objectDeque);
-                poolKeyList.add(key);
-            } else {
-                newNumActive = objectDeque.getNumActive().incrementAndGet();
-            }
-        }
+        ObjectDeque<T> objectDeque = poolMap.get(key);
+        int newNumActive = objectDeque.getNumActive().incrementAndGet();
 
         // Check against the per key limit
         if (!force && maxActive > -1 && newNumActive > maxActive) {
-            cleanObjectDeque(key, objectDeque);
             numTotal.decrementAndGet();
             return null;
         }
@@ -1592,7 +1590,6 @@ public class GenericKeyedObjectPool<K,T>
         try {
             t = _factory.makeObject(key);
         } catch (Exception e) {
-            cleanObjectDeque(key, objectDeque);
             numTotal.decrementAndGet();
             throw e;
         }
@@ -1603,33 +1600,76 @@ public class GenericKeyedObjectPool<K,T>
     }
 
     private void destroy(K key, PooledObject<T> toDestory) throws Exception {
-         
-        ObjectDeque<T> objectDeque = poolMap.get(key);
-        objectDeque.getIdleObjects().remove(toDestory);
-        objectDeque.getAllObjects().remove(toDestory.getObject());
+        
+        register(key);
+        
+        try {
+            ObjectDeque<T> objectDeque = poolMap.get(key);
+            objectDeque.getIdleObjects().remove(toDestory);
+            objectDeque.getAllObjects().remove(toDestory.getObject());
+    
+            try {
+                _factory.destroyObject(key, toDestory.getObject());
+            } finally {
+                objectDeque.getNumActive().decrementAndGet();
+                numTotal.decrementAndGet();
+            }
+        } finally {
+            deregister(key);
+        }
+    }
 
+    private ObjectDeque<T> register(K k) {
+        Lock lock = keyLock.readLock();
+        ObjectDeque<T> objectDeque = null;
         try {
-            _factory.destroyObject(key, toDestory.getObject());
+            lock.lock();
+            objectDeque = poolMap.get(k);
+            if (objectDeque == null) {
+                // Upgrade to write lock
+                lock.unlock();
+                lock = keyLock.writeLock();
+                lock.lock();
+                objectDeque = poolMap.get(k);
+                if (objectDeque == null) {
+                    objectDeque = new ObjectDeque<T>();
+                    objectDeque.getNumInterested().incrementAndGet();
+                    poolMap.put(k, objectDeque);
+                    poolKeyList.add(k);
+                } else {
+                    objectDeque.getNumInterested().incrementAndGet();
+                }
+            } else {
+                objectDeque.getNumInterested().incrementAndGet();
+            }
         } finally {
-            cleanObjectDeque(key, objectDeque);
-            numTotal.decrementAndGet();
+            lock.unlock();
         }
+        return objectDeque;
     }
+    
+    private void deregister(K k) {
+        ObjectDeque<T> objectDeque;
 
-    private void cleanObjectDeque(K key, ObjectDeque<T> objectDeque) {
-        int newNumActive = objectDeque.getNumActive().decrementAndGet();
-        if (newNumActive == 0) {
-            synchronized (poolMap) {
-                newNumActive = objectDeque.getNumActive().get();
-                if (newNumActive == 0) {
-                    poolMap.remove(key);
-                    poolKeyList.remove(key);
+        // TODO Think carefully about when a read lock is required
+        objectDeque = poolMap.get(k);
+        long numInterested = objectDeque.getNumInterested().decrementAndGet();
+        if (numInterested == 0 && objectDeque.getNumActive().get() == 0) {
+            // Potential to remove key
+            Lock lock = keyLock.writeLock();
+            lock.lock();
+            try {
+                if (objectDeque.getNumActive().get() == 0 &&
+                        objectDeque.getNumInterested().get() == 0) {
+                    poolMap.remove(k);
+                    poolKeyList.remove(k);
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
 
-    
     /**
      * Iterates through all the known keys and creates any necessary objects to maintain
      * the minimum level of pooled objects.
@@ -1726,8 +1766,13 @@ public class GenericKeyedObjectPool<K,T>
         if (_factory == null) {
             throw new IllegalStateException("Cannot add objects without a factory.");
         }
-        PooledObject<T> p = create(key, false);
-        addIdleObject(key, p);
+        register(key);
+        try {
+            PooledObject<T> p = create(key, false);
+            addIdleObject(key, p);
+        } finally {
+            deregister(key);
+        }
     }
 
     /**
@@ -1843,6 +1888,8 @@ public class GenericKeyedObjectPool<K,T>
         private Map<S, PooledObject<S>> allObjects =
                 new ConcurrentHashMap<S, PooledObject<S>>();
 
+        private AtomicLong numInterested = new AtomicLong(0);
+        
         public LinkedBlockingDeque<PooledObject<S>> getIdleObjects() {
             return idleObjects;
         }
@@ -1851,6 +1898,10 @@ public class GenericKeyedObjectPool<K,T>
             return numActive;
         }
         
+        public AtomicLong getNumInterested() {
+            return numInterested;
+        }
+        
         public Map<S, PooledObject<S>> getAllObjects() {
             return allObjects;
         }
@@ -2104,6 +2155,9 @@ public class GenericKeyedObjectPool<K,T>
     /** List of pool keys - used to control eviction order */
     private List<K> poolKeyList = new ArrayList<K>();
 
+    /** Lock used to manage adding/removing of keys */
+    private ReadWriteLock keyLock = new ReentrantReadWriteLock(true);
+
     /**
      * The combined count of the currently active objects for all keys and those
      * in the process of being created. Under load, it may exceed



Mime
View raw message