activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5564
Date Thu, 05 Feb 2015 22:50:52 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 9f0ab46e2 -> f91abd3d4


https://issues.apache.org/jira/browse/AMQ-5564

Fixed session in the pool losing their reference to the anonymous
producer created when useAnonymousProducers is true.  The anonymous
producer stays live for the life of the pooled session.

Also added some synchronization safety to some methods that could get
into NPE trouble.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f91abd3d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f91abd3d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f91abd3d

Branch: refs/heads/trunk
Commit: f91abd3d46ac1f7ded399ce3bbc36fdfb70a91cb
Parents: 9f0ab46
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Feb 5 17:50:43 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Feb 5 17:50:43 2015 -0500

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       | 25 +++--
 .../activemq/jms/pool/PooledConnection.java     |  6 +-
 .../apache/activemq/jms/pool/PooledSession.java | 81 ++++++----------
 .../apache/activemq/jms/pool/SessionHolder.java | 98 ++++++++++++++++++++
 .../activemq/jms/pool/PooledSessionTest.java    | 53 +++++++++--
 5 files changed, 193 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index c802a17..6c3fdc3 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -21,8 +21,13 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@@ -51,7 +56,7 @@ public class ConnectionPool implements ExceptionListener {
     private boolean useAnonymousProducers = true;
 
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
+    private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
     private boolean reconnectOnException;
     private ExceptionListener parentExceptionListener;
@@ -61,29 +66,29 @@ public class ConnectionPool implements ExceptionListener {
         this.connection = wrap(connection);
 
         // Create our internal Pool of session instances.
-        this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
-            new KeyedPoolableObjectFactory<SessionKey, Session>() {
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
+            new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
 
                 @Override
-                public void activateObject(SessionKey key, Session session) throws Exception
{
+                public void activateObject(SessionKey key, SessionHolder session) throws
Exception {
                 }
 
                 @Override
-                public void destroyObject(SessionKey key, Session session) throws Exception
{
+                public void destroyObject(SessionKey key, SessionHolder session) throws Exception
{
                     session.close();
                 }
 
                 @Override
-                public Session makeObject(SessionKey key) throws Exception {
-                    return makeSession(key);
+                public SessionHolder makeObject(SessionKey key) throws Exception {
+                    return new SessionHolder(makeSession(key));
                 }
 
                 @Override
-                public void passivateObject(SessionKey key, Session session) throws Exception
{
+                public void passivateObject(SessionKey key, SessionHolder session) throws
Exception {
                 }
 
                 @Override
-                public boolean validateObject(SessionKey key, Session session) {
+                public boolean validateObject(SessionKey key, SessionHolder session) {
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index b268862..b7b56ba 100755
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -24,6 +24,7 @@ import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueConnection;
@@ -35,7 +36,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
-import javax.jms.IllegalStateException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,8 +164,7 @@ public class PooledConnection implements TopicConnection, QueueConnection,
Poole
 
     @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        PooledSession result;
-        result = (PooledSession) pool.createSession(transacted, ackMode);
+        PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
 
         // Store the session so we can close the sessions that this PooledConnection
         // created in order to ensure that consumers etc are closed per the JMS contract.

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
index 3a2e698..cbfec29 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
     private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, Session> sessionPool;
+    private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
     private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners
= new CopyOnWriteArrayList<PooledSessionEventListener>();
     private final AtomicBoolean closed = new AtomicBoolean();
 
-    private MessageProducer producer;
-    private TopicPublisher publisher;
-    private QueueSender sender;
-
-    private Session session;
+    private SessionHolder sessionHolder;
     private boolean transactional = true;
     private boolean ignoreClose;
     private boolean isXa;
     private boolean useAnonymousProducers = true;
 
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey,
Session> sessionPool, boolean transactional, boolean anonymous) {
+    public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey,
SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
         this.key = key;
-        this.session = session;
+        this.sessionHolder = sessionHolder;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
         this.useAnonymousProducers = anonymous;
@@ -140,21 +136,21 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
             if (invalidate) {
                 // lets close the session and not put the session back into the pool
                 // instead invalidate it so the pool can create a new one on demand.
-                if (session != null) {
+                if (sessionHolder != null) {
                     try {
-                        session.close();
+                        sessionHolder.close();
                     } catch (JMSException e1) {
                         LOG.trace("Ignoring exception on close as discarding session: " +
e1, e1);
                     }
                 }
                 try {
-                    sessionPool.invalidateObject(key, session);
+                    sessionPool.invalidateObject(key, sessionHolder);
                 } catch (Exception e) {
                     LOG.trace("Ignoring exception on invalidateObject as discarding session:
" + e, e);
                 }
             } else {
                 try {
-                    sessionPool.returnObject(key, session);
+                    sessionPool.returnObject(key, sessionHolder);
                 } catch (Exception e) {
                     javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
                     illegalStateException.initCause(e);
@@ -162,7 +158,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
                 }
             }
 
-            session = null;
+            sessionHolder = null;
         }
     }
 
@@ -276,9 +272,12 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
 
     @Override
     public XAResource getXAResource() {
-        if (session instanceof XASession) {
-            return ((XASession) session).getXAResource();
+        SessionHolder session = safeGetSessionHolder();
+
+        if (session.getSession() instanceof XASession) {
+            return ((XASession) session.getSession()).getXAResource();
         }
+
         return null;
     }
 
@@ -289,8 +288,9 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
 
     @Override
     public void run() {
+        SessionHolder session = safeGetSessionHolder();
         if (session != null) {
-            session.run();
+            session.getSession().run();
         }
     }
 
@@ -379,10 +379,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
     }
 
     public Session getInternalSession() throws IllegalStateException {
-        if (session == null) {
-            throw new IllegalStateException("The session has already been closed");
-        }
-        return session;
+        return safeGetSessionHolder().getSession();
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
@@ -393,16 +390,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
         MessageProducer result = null;
 
         if (useAnonymousProducers) {
-            if (producer == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (producer == null) {
-                        producer = getInternalSession().createProducer(null);
-                    }
-                }
-            }
-
-            result = producer;
+            result = safeGetSessionHolder().getOrCreateProducer();
         } else {
             result = getInternalSession().createProducer(destination);
         }
@@ -418,16 +406,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
         QueueSender result = null;
 
         if (useAnonymousProducers) {
-            if (sender == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (sender == null) {
-                        sender = ((QueueSession) getInternalSession()).createSender(null);
-                    }
-                }
-            }
-
-            result = sender;
+            result = safeGetSessionHolder().getOrCreateSender();
         } else {
             result = ((QueueSession) getInternalSession()).createSender(destination);
         }
@@ -443,16 +422,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
         TopicPublisher result = null;
 
         if (useAnonymousProducers) {
-            if (publisher == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (publisher == null) {
-                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
-                    }
-                }
-            }
-
-            result = publisher;
+            result = safeGetSessionHolder().getOrCreatePublisher();
         } else {
             result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
@@ -489,7 +459,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
 
     @Override
     public String toString() {
-        return "PooledSession { " + session + " }";
+        return "PooledSession { " + safeGetSessionHolder() + " }";
     }
 
     /**
@@ -505,4 +475,13 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
     protected void onConsumerClose(MessageConsumer consumer) {
         consumers.remove(consumer);
     }
+
+    private SessionHolder safeGetSessionHolder() {
+        SessionHolder sessionHolder = this.sessionHolder;
+        if (sessionHolder == null) {
+            throw new IllegalStateException("The session has already been closed");
+        }
+
+        return sessionHolder;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
new file mode 100644
index 0000000..afa75d6
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+/**
+ * Used to store a pooled session instance and any resources that can
+ * be left open and carried along with the pooled instance such as the
+ * anonymous producer used for all MessageProducer instances created
+ * from this pooled session when enabled.
+ */
+public class SessionHolder {
+
+    private final Session session;
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
+
+    public SessionHolder(Session session) {
+        this.session = session;
+    }
+
+    public void close() throws JMSException {
+        try {
+            session.close();
+        } finally {
+            producer = null;
+            publisher = null;
+            sender = null;
+        }
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    public MessageProducer getOrCreateProducer() throws JMSException {
+        if (producer == null) {
+            synchronized (this) {
+                if (producer == null) {
+                    producer = session.createProducer(null);
+                }
+            }
+        }
+
+        return producer;
+    }
+
+    public TopicPublisher getOrCreatePublisher() throws JMSException {
+        if (publisher == null) {
+            synchronized (this) {
+                if (publisher == null) {
+                    publisher = ((TopicSession) session).createPublisher(null);
+                }
+            }
+        }
+
+        return publisher;
+    }
+
+    public QueueSender getOrCreateSender() throws JMSException {
+        if (sender == null) {
+            synchronized (this) {
+                if (sender == null) {
+                    sender = ((QueueSession) session).createSender(null);
+                }
+            }
+        }
+
+        return sender;
+    }
+
+    @Override
+    public String toString() {
+        return session.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
index 7483e6b..9432add 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
@@ -17,9 +17,13 @@
 package org.apache.activemq.jms.pool;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -44,7 +48,8 @@ public class PooledSessionTest {
     public void setUp() throws Exception {
         broker = new BrokerService();
         broker.setPersistent(false);
-        broker.setUseJmx(false);
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateMBeanServer(false);
         TransportConnector connector = broker.addConnector("tcp://localhost:0");
         broker.start();
         connectionUri = connector.getPublishableConnectString();
@@ -62,7 +67,7 @@ public class PooledSessionTest {
         broker = null;
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testPooledSessionStats() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
 
@@ -73,9 +78,11 @@ public class PooledSessionTest {
         assertEquals(0, connection.getNumActiveSessions());
         assertEquals(1, connection.getNumtIdleSessions());
         assertEquals(1, connection.getNumSessions());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testMessageProducersAreAllTheSame() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -87,9 +94,11 @@ public class PooledSessionTest {
         PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
 
         assertSame(producer1.getMessageProducer(), producer2.getMessageProducer());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testThrowsWhenDifferentDestinationGiven() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -110,9 +119,11 @@ public class PooledSessionTest {
             fail("Should only be able to send to queue 1");
         } catch (Exception ex) {
         }
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testCreateTopicPublisher() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -124,9 +135,10 @@ public class PooledSessionTest {
         PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
 
         assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testQueueSender() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -138,5 +150,34 @@ public class PooledSessionTest {
         PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
 
         assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRepeatedCreateSessionProducerResultsInSame() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+
+        assertTrue(pooledFactory.isUseAnonymousProducers());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic("test-topic");
+        PooledProducer producer = (PooledProducer) session.createProducer(destination);
+        MessageProducer original = producer.getMessageProducer();
+        assertNotNull(original);
+        session.close();
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        for (int i = 0; i < 20; ++i) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = (PooledProducer) session.createProducer(destination);
+            assertSame(original, producer.getMessageProducer());
+            session.close();
+        }
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        connection.close();
+        pooledFactory.clear();
     }
 }


Mime
View raw message