activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1173648 - in /activemq/trunk/activemq-pool/src: main/java/org/apache/activemq/pool/ConnectionPool.java test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java
Date Wed, 21 Sep 2011 13:56:12 GMT
Author: tabish
Date: Wed Sep 21 13:56:11 2011
New Revision: 1173648

URL: http://svn.apache.org/viewvc?rev=1173648&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3506

Added:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java
  (with props)
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=1173648&r1=1173647&r2=1173648&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
Wed Sep 21 13:56:11 2011
@@ -18,9 +18,8 @@
 package org.apache.activemq.pool;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
@@ -32,13 +31,13 @@ import org.apache.commons.pool.ObjectPoo
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
- * 
- * 
+ *
+ *
  */
 public class ConnectionPool {
 
     private ActiveMQConnection connection;
-    private Map<SessionKey, SessionPool> cache;
+    private ConcurrentHashMap<SessionKey, SessionPool> cache;
     private AtomicBoolean started = new AtomicBoolean(false);
     private int referenceCount;
     private ObjectPoolFactory poolFactory;
@@ -50,7 +49,7 @@ public class ConnectionPool {
     private long expiryTimeout = 0l;
 
     public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
-        this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
+        this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
         // Add a transport Listener so that we can notice if this connection
         // should be expired due to
         // a connection failure.
@@ -69,7 +68,7 @@ public class ConnectionPool {
 
             public void transportResumed() {
             }
-        });       
+        });
         //
         // make sure that we set the hasFailed flag, in case the transport already failed
         // prior to the addition of our new TransportListener
@@ -79,7 +78,7 @@ public class ConnectionPool {
         }
     }
 
-    public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool>
cache, ObjectPoolFactory poolFactory) {
+    public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey,
SessionPool> cache, ObjectPoolFactory poolFactory) {
         this.connection = connection;
         this.cache = cache;
         this.poolFactory = poolFactory;
@@ -87,12 +86,12 @@ public class ConnectionPool {
 
     public void start() throws JMSException {
         if (started.compareAndSet(false, true)) {
-        	try {
-        		connection.start();
-        	} catch (JMSException e) {
-        		started.set(false);
-        		throw(e);
-        	}
+            try {
+                connection.start();
+            } catch (JMSException e) {
+                started.set(false);
+                throw(e);
+            }
         }
     }
 
@@ -102,10 +101,21 @@ public class ConnectionPool {
 
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
         SessionKey key = new SessionKey(transacted, ackMode);
-        SessionPool pool = cache.get(key);
+        SessionPool pool = null;
+        pool = cache.get(key);
         if (pool == null) {
-            pool = createSessionPool(key);
-            cache.put(key, pool);
+            SessionPool newPool = createSessionPool(key);
+            SessionPool prevPool = cache.putIfAbsent(key, newPool);
+            if (prevPool != null && prevPool != newPool) {
+                // newPool was not the first one to be associated with this
+                // key... close created session pool
+                try {
+                    newPool.close();
+                } catch (Exception e) {
+                    throw new JMSException(e.getMessage());
+                }
+            }
+            pool = cache.get(key); // this will return a non-null value...
         }
         PooledSession session = pool.borrowSession();
         return session;
@@ -144,8 +154,8 @@ public class ConnectionPool {
         lastUsed = System.currentTimeMillis();
         if (referenceCount == 0) {
             expiredCheck();
-            
-            // only clean up temp destinations when all users 
+
+            // only clean up temp destinations when all users
             // of this connection have called close
             if (getConnection() != null) {
                 getConnection().cleanUpTempDestinations();
@@ -166,7 +176,7 @@ public class ConnectionPool {
             }
             return true;
         }
-        if (hasFailed 
+        if (hasFailed
                 || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed
+ idleTimeout)
                 || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed
+ expiryTimeout) {
             hasExpired = true;
@@ -193,7 +203,7 @@ public class ConnectionPool {
     public void setExpiryTimeout(long expiryTimeout) {
         this.expiryTimeout  = expiryTimeout;
     }
-    
+
     public long getExpiryTimeout() {
         return expiryTimeout;
     }

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java?rev=1173648&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java
(added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java
Wed Sep 21 13:56:11 2011
@@ -0,0 +1,151 @@
+package org.apache.activemq.pool;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * Checks the behavior of the PooledConnectionFactory when the maximum amount
+ * of sessions is being reached (maximumActive).
+ * When using setBlockIfSessionPoolIsFull(true) on the ConnectionFactory,
+ * further requests for sessions should block.
+ * If it does not block, its a bug.
+ *
+ * @author: tmielke
+ */
+public class PooledConnectionFactoryMaximumActiveTest extends TestCase
+{
+    public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryMaximumActiveTest.class);
+    public static Connection conn = null;
+    public static int sleepTimeout = 5000;
+
+    private static ConcurrentHashMap<Integer, Session> sessions = new ConcurrentHashMap<Integer,Session>();
+
+
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public PooledConnectionFactoryMaximumActiveTest( String testName )
+    {
+        super( testName );
+    }
+
+    public static void addSession(Session s) {
+        sessions.put(s.hashCode(), s);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite()
+    {
+        return new TestSuite( PooledConnectionFactoryMaximumActiveTest.class );
+    }
+
+    /**
+     * Tests the behavior of the sessionPool of the PooledConnectionFactory
+     * when maximum number of sessions are reached. This test uses
+     * maximumActive=1.
+     * When creating two threads that both
+     * try to create a JMS session from the same JMS connection,
+     * the thread that is second to call createSession()
+     * should block (as only 1 session is allowed) until the
+     * session is returned to pool.
+     * If it does not block, its a bug.
+     *
+     */
+    public void testApp() throws Exception
+    {
+        // Initialize JMS connection
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+        cf.setMaxConnections(3);
+        cf.setMaximumActive(1);
+        cf.setBlockIfSessionPoolIsFull(true);
+        conn = cf.createConnection();
+
+        // start test runner threads. It is expected that the second thread
+        // blocks on the call to createSession()
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        executor.submit(new TestRunner2());
+        // Thread.sleep(100);
+        Future<Boolean> result2 = (Future<Boolean>) executor.submit(new TestRunner2());
+
+
+        // sleep to allow threads to run
+        Thread.sleep(sleepTimeout);
+
+        // second task should not have finished, instead wait on getting a
+        // JMS Session
+        Assert.assertEquals(false, result2.isDone());
+
+        //Only 1 session should have been created
+        Assert.assertEquals(1, sessions.size());
+
+        // Take all threads down
+        executor.shutdownNow();
+
+    }
+}
+
+class TestRunner2 implements Callable<Boolean> {
+
+    public final static Logger LOG = Logger.getLogger(TestRunner2.class);
+
+    /**
+     * @return true if test succeeded, false otherwise
+     */
+    public Boolean call() {
+
+        Session one = null;
+
+        // wait at most 5 seconds for the call to createSession
+        try {
+
+            if (PooledConnectionFactoryMaximumActiveTest.conn == null) {
+                LOG.error("Connection not yet initialized. Aborting test.");
+                return new Boolean(false);
+            }
+
+            one = PooledConnectionFactoryMaximumActiveTest.conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            LOG.info("Created new Session with id" + one);
+            PooledConnectionFactoryMaximumActiveTest.addSession(one);
+            Thread.sleep(2 * PooledConnectionFactoryMaximumActiveTest.sleepTimeout);
+
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage());
+            return new Boolean(false);
+
+        } finally {
+            if (one != null)
+                try {
+                    one.close();
+                } catch (JMSException e) {
+                    LOG.error(e.getMessage());
+                }
+        }
+
+        // all good, test succeeded
+        return new Boolean(true);
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryMaximumActiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message