activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1465723 - in /activemq/trunk/activemq-pool/src: main/java/org/apache/activemq/pool/ test/java/org/apache/activemq/pool/bugs/
Date Mon, 08 Apr 2013 18:58:11 GMT
Author: tabish
Date: Mon Apr  8 18:58:11 2013
New Revision: 1465723

URL: http://svn.apache.org/r1465723
Log:
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4441

Added:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
  (with props)
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=1465723&r1=1465722&r2=1465723&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
Mon Apr  8 18:58:11 2013
@@ -32,6 +32,8 @@ import org.apache.activemq.util.JMSExcep
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
@@ -43,6 +45,8 @@ import org.apache.commons.pool.impl.Gene
  */
 public class ConnectionPool {
 
+    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
+
     private ActiveMQConnection connection;
     private int referenceCount;
     private long lastUsed = System.currentTimeMillis();
@@ -207,6 +211,9 @@ public class ConnectionPool {
      * @return true if this connection has expired.
      */
     public synchronized boolean expiredCheck() {
+
+        boolean expired = false;
+
         if (connection == null) {
             return true;
         }
@@ -214,25 +221,27 @@ public class ConnectionPool {
         if (hasExpired || hasFailed) {
             if (referenceCount == 0) {
                 close();
+                expired = true;
             }
-            return true;
         }
 
         if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout)
{
             hasExpired = true;
             if (referenceCount == 0) {
                 close();
+                expired = true;
             }
-            return true;
         }
 
+        // Only set hasExpired here is no references, as a Connection with references is
by
+        // definition not idle at this time.
         if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis()
> lastUsed + idleTimeout) {
             hasExpired = true;
             close();
-            return true;
+            expired = true;
         }
 
-        return false;
+        return expired;
     }
 
     public int getIdleTimeout() {

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1465723&r1=1465722&r2=1465723&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
Mon Apr  8 18:58:11 2013
@@ -74,7 +74,6 @@ public class PooledConnection implements
      */
     public PooledConnection(ConnectionPool pool) {
         this.pool = pool;
-        this.pool.incrementReferenceCount();
     }
 
     /**

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?rev=1465723&r1=1465722&r2=1465723&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
Mon Apr  8 18:58:11 2013
@@ -220,7 +220,25 @@ public class PooledConnectionFactory imp
         }
 
         try {
-            connection = connectionsPool.borrowObject(key);
+
+            // We can race against other threads returning the connection when there is an
+            // expiration or idle timeout.  We keep pulling out ConnectionPool instances
until
+            // we win and get a non-closed instance and then increment the reference count
+            // under lock to prevent another thread from triggering an expiration check and
+            // pulling the rug out from under us.
+            while (connection == null) {
+                connection = connectionsPool.borrowObject(key);
+                synchronized (connection) {
+                    if (connection.getConnection() != null) {
+                        connection.incrementReferenceCount();
+                        break;
+                    }
+
+                    // Return the bad one to the pool and let if get destroyed as normal.
+                    connectionsPool.returnObject(key, connection);
+                    connection = null;
+                }
+            }
         } catch (Exception e) {
             throw JMSExceptionSupport.create("Error while attempting to retrieve a connection
from the pool", e);
         }

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java?rev=1465723&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
(added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
Mon Apr  8 18:58:11 2013
@@ -0,0 +1,84 @@
+package org.apache.activemq.pool.bugs;
+
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.pool.PooledConnection;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4441Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4441Test.class);
+    private BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout=120000)
+    public void demo() throws JMSException, InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final PooledConnectionFactory pooled = new PooledConnectionFactory("vm://localhost?create=false");
+
+        pooled.setMaxConnections(2);
+        pooled.setExpiryTimeout(10L);
+        pooled.start();
+        Thread[] threads = new Thread[10];
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (!done.get() && latch.getCount() > 0) {
+                        try {
+                            final PooledConnection pooledConnection = (PooledConnection)
pooled.createConnection();
+                            if (pooledConnection.getConnection() == null) {
+                                LOG.info("Found broken connection.");
+                                latch.countDown();
+                            }
+                            pooledConnection.close();
+                        } catch (JMSException e) {
+                            LOG.warn("Caught Exception", e);
+                        }
+                    }
+                }
+            });
+        }
+        for (Thread thread : threads) {
+            thread.start();
+        }
+
+        if (latch.await(1, TimeUnit.MINUTES)) {
+            fail("A thread obtained broken connection");
+        }
+
+        done.set(true);
+        for (Thread thread : threads) {
+            thread.join();
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/bugs/AMQ4441Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message