activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5564
Date Fri, 06 Feb 2015 07:13:05 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.10.x ede604eaf -> f62f47b92


https://issues.apache.org/jira/browse/AMQ-5564

Fixed session in the pool losing their reference to the anonymous
producer created when useAnonymousProducers is true.  The anonymous
producer stays live for the life of the pooled session.

Also added some synchronization safety to some methods that could get
into NPE trouble.
Conflicts:
	activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f62f47b9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f62f47b9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f62f47b9

Branch: refs/heads/activemq-5.10.x
Commit: f62f47b92d8c8d60fc9cd0f796b80eacc50c4b4a
Parents: ede604e
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Feb 5 17:50:43 2015 -0500
Committer: Claus Ibsen <claus.ibsen@gmail.com>
Committed: Fri Feb 6 08:05:17 2015 +0100

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       | 18 ++--
 .../activemq/jms/pool/PooledConnection.java     |  6 +-
 .../apache/activemq/jms/pool/PooledSession.java | 81 ++++++----------
 .../apache/activemq/jms/pool/SessionHolder.java | 98 ++++++++++++++++++++
 .../activemq/jms/pool/PooledSessionTest.java    | 53 +++++++++--
 5 files changed, 187 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index 26995ea..9b773d6 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -52,7 +52,7 @@ public class ConnectionPool {
     private boolean useAnonymousProducers = true;
 
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
+    private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
 
     public ConnectionPool(Connection connection) {
@@ -60,29 +60,29 @@ public class ConnectionPool {
         this.connection = wrap(connection);
 
         // Create our internal Pool of session instances.
-        this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
-            new KeyedPoolableObjectFactory<SessionKey, Session>() {
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
+            new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
 
                 @Override
-                public void activateObject(SessionKey key, Session session) throws Exception
{
+                public void activateObject(SessionKey key, SessionHolder session) throws
Exception {
                 }
 
                 @Override
-                public void destroyObject(SessionKey key, Session session) throws Exception
{
+                public void destroyObject(SessionKey key, SessionHolder session) throws Exception
{
                     session.close();
                 }
 
                 @Override
-                public Session makeObject(SessionKey key) throws Exception {
-                    return makeSession(key);
+                public SessionHolder makeObject(SessionKey key) throws Exception {
+                    return new SessionHolder(makeSession(key));
                 }
 
                 @Override
-                public void passivateObject(SessionKey key, Session session) throws Exception
{
+                public void passivateObject(SessionKey key, SessionHolder session) throws
Exception {
                 }
 
                 @Override
-                public boolean validateObject(SessionKey key, Session session) {
+                public boolean validateObject(SessionKey key, SessionHolder session) {
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index b268862..b7b56ba 100755
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -24,6 +24,7 @@ import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueConnection;
@@ -35,7 +36,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
-import javax.jms.IllegalStateException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,8 +164,7 @@ public class PooledConnection implements TopicConnection, QueueConnection,
Poole
 
     @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        PooledSession result;
-        result = (PooledSession) pool.createSession(transacted, ackMode);
+        PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
 
         // Store the session so we can close the sessions that this PooledConnection
         // created in order to ensure that consumers etc are closed per the JMS contract.

http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
index 3a2e698..cbfec29 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
     private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, Session> sessionPool;
+    private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
     private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners
= new CopyOnWriteArrayList<PooledSessionEventListener>();
     private final AtomicBoolean closed = new AtomicBoolean();
 
-    private MessageProducer producer;
-    private TopicPublisher publisher;
-    private QueueSender sender;
-
-    private Session session;
+    private SessionHolder sessionHolder;
     private boolean transactional = true;
     private boolean ignoreClose;
     private boolean isXa;
     private boolean useAnonymousProducers = true;
 
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey,
Session> sessionPool, boolean transactional, boolean anonymous) {
+    public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey,
SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
         this.key = key;
-        this.session = session;
+        this.sessionHolder = sessionHolder;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
         this.useAnonymousProducers = anonymous;
@@ -140,21 +136,21 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
             if (invalidate) {
                 // lets close the session and not put the session back into the pool
                 // instead invalidate it so the pool can create a new one on demand.
-                if (session != null) {
+                if (sessionHolder != null) {
                     try {
-                        session.close();
+                        sessionHolder.close();
                     } catch (JMSException e1) {
                         LOG.trace("Ignoring exception on close as discarding session: " +
e1, e1);
                     }
                 }
                 try {
-                    sessionPool.invalidateObject(key, session);
+                    sessionPool.invalidateObject(key, sessionHolder);
                 } catch (Exception e) {
                     LOG.trace("Ignoring exception on invalidateObject as discarding session:
" + e, e);
                 }
             } else {
                 try {
-                    sessionPool.returnObject(key, session);
+                    sessionPool.returnObject(key, sessionHolder);
                 } catch (Exception e) {
                     javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
                     illegalStateException.initCause(e);
@@ -162,7 +158,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
                 }
             }
 
-            session = null;
+            sessionHolder = null;
         }
     }
 
@@ -276,9 +272,12 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
 
     @Override
     public XAResource getXAResource() {
-        if (session instanceof XASession) {
-            return ((XASession) session).getXAResource();
+        SessionHolder session = safeGetSessionHolder();
+
+        if (session.getSession() instanceof XASession) {
+            return ((XASession) session.getSession()).getXAResource();
         }
+
         return null;
     }
 
@@ -289,8 +288,9 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
 
     @Override
     public void run() {
+        SessionHolder session = safeGetSessionHolder();
         if (session != null) {
-            session.run();
+            session.getSession().run();
         }
     }
 
@@ -379,10 +379,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
     }
 
     public Session getInternalSession() throws IllegalStateException {
-        if (session == null) {
-            throw new IllegalStateException("The session has already been closed");
-        }
-        return session;
+        return safeGetSessionHolder().getSession();
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
@@ -393,16 +390,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
         MessageProducer result = null;
 
         if (useAnonymousProducers) {
-            if (producer == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (producer == null) {
-                        producer = getInternalSession().createProducer(null);
-                    }
-                }
-            }
-
-            result = producer;
+            result = safeGetSessionHolder().getOrCreateProducer();
         } else {
             result = getInternalSession().createProducer(destination);
         }
@@ -418,16 +406,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
         QueueSender result = null;
 
         if (useAnonymousProducers) {
-            if (sender == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (sender == null) {
-                        sender = ((QueueSession) getInternalSession()).createSender(null);
-                    }
-                }
-            }
-
-            result = sender;
+            result = safeGetSessionHolder().getOrCreateSender();
         } else {
             result = ((QueueSession) getInternalSession()).createSender(destination);
         }
@@ -443,16 +422,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
         TopicPublisher result = null;
 
         if (useAnonymousProducers) {
-            if (publisher == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (publisher == null) {
-                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
-                    }
-                }
-            }
-
-            result = publisher;
+            result = safeGetSessionHolder().getOrCreatePublisher();
         } else {
             result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
@@ -489,7 +459,7 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
 
     @Override
     public String toString() {
-        return "PooledSession { " + session + " }";
+        return "PooledSession { " + safeGetSessionHolder() + " }";
     }
 
     /**
@@ -505,4 +475,13 @@ public class PooledSession implements Session, TopicSession, QueueSession,
XASes
     protected void onConsumerClose(MessageConsumer consumer) {
         consumers.remove(consumer);
     }
+
+    private SessionHolder safeGetSessionHolder() {
+        SessionHolder sessionHolder = this.sessionHolder;
+        if (sessionHolder == null) {
+            throw new IllegalStateException("The session has already been closed");
+        }
+
+        return sessionHolder;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
new file mode 100644
index 0000000..afa75d6
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+/**
+ * Used to store a pooled session instance and any resources that can
+ * be left open and carried along with the pooled instance such as the
+ * anonymous producer used for all MessageProducer instances created
+ * from this pooled session when enabled.
+ */
+public class SessionHolder {
+
+    private final Session session;
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
+
+    public SessionHolder(Session session) {
+        this.session = session;
+    }
+
+    public void close() throws JMSException {
+        try {
+            session.close();
+        } finally {
+            producer = null;
+            publisher = null;
+            sender = null;
+        }
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    public MessageProducer getOrCreateProducer() throws JMSException {
+        if (producer == null) {
+            synchronized (this) {
+                if (producer == null) {
+                    producer = session.createProducer(null);
+                }
+            }
+        }
+
+        return producer;
+    }
+
+    public TopicPublisher getOrCreatePublisher() throws JMSException {
+        if (publisher == null) {
+            synchronized (this) {
+                if (publisher == null) {
+                    publisher = ((TopicSession) session).createPublisher(null);
+                }
+            }
+        }
+
+        return publisher;
+    }
+
+    public QueueSender getOrCreateSender() throws JMSException {
+        if (sender == null) {
+            synchronized (this) {
+                if (sender == null) {
+                    sender = ((QueueSession) session).createSender(null);
+                }
+            }
+        }
+
+        return sender;
+    }
+
+    @Override
+    public String toString() {
+        return session.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
index 7483e6b..9432add 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
@@ -17,9 +17,13 @@
 package org.apache.activemq.jms.pool;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -44,7 +48,8 @@ public class PooledSessionTest {
     public void setUp() throws Exception {
         broker = new BrokerService();
         broker.setPersistent(false);
-        broker.setUseJmx(false);
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateMBeanServer(false);
         TransportConnector connector = broker.addConnector("tcp://localhost:0");
         broker.start();
         connectionUri = connector.getPublishableConnectString();
@@ -62,7 +67,7 @@ public class PooledSessionTest {
         broker = null;
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testPooledSessionStats() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
 
@@ -73,9 +78,11 @@ public class PooledSessionTest {
         assertEquals(0, connection.getNumActiveSessions());
         assertEquals(1, connection.getNumtIdleSessions());
         assertEquals(1, connection.getNumSessions());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testMessageProducersAreAllTheSame() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -87,9 +94,11 @@ public class PooledSessionTest {
         PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
 
         assertSame(producer1.getMessageProducer(), producer2.getMessageProducer());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testThrowsWhenDifferentDestinationGiven() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -110,9 +119,11 @@ public class PooledSessionTest {
             fail("Should only be able to send to queue 1");
         } catch (Exception ex) {
         }
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testCreateTopicPublisher() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -124,9 +135,10 @@ public class PooledSessionTest {
         PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
 
         assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testQueueSender() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -138,5 +150,34 @@ public class PooledSessionTest {
         PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
 
         assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRepeatedCreateSessionProducerResultsInSame() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+
+        assertTrue(pooledFactory.isUseAnonymousProducers());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic("test-topic");
+        PooledProducer producer = (PooledProducer) session.createProducer(destination);
+        MessageProducer original = producer.getMessageProducer();
+        assertNotNull(original);
+        session.close();
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        for (int i = 0; i < 20; ++i) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = (PooledProducer) session.createProducer(destination);
+            assertSame(original, producer.getMessageProducer());
+            session.close();
+        }
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        connection.close();
+        pooledFactory.clear();
     }
 }


Mime
View raw message