Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 70AFA11D70 for ; Thu, 12 Jun 2014 23:13:12 +0000 (UTC) Received: (qmail 63140 invoked by uid 500); 12 Jun 2014 23:13:12 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 63100 invoked by uid 500); 12 Jun 2014 23:13:12 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63093 invoked by uid 99); 12 Jun 2014 23:13:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jun 2014 23:13:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0418A8C3DD1; Thu, 12 Jun 2014 23:13:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <1e580f481d324fc89de283dd4f673d35@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-5015 Date: Thu, 12 Jun 2014 23:13:11 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 91a004191 -> f395c7060 https://issues.apache.org/jira/browse/AMQ-5015 Refactor the way sessions are pooled. We don't need to keep the PooledSession instances around since the state is unique to the session it wraps we only need to keep the Session instances in the SessionPool and create a new PooledSession on borrow to manage that session. This allows the PooledSession to have a real closed state that protects against multiple close calls placing duplicate PooledSession instances into the SessionPool. This also simplifies the code in the XaConnectionPool since it doesn't need to try and reset state in PouledSessions before placing them back as it gets a fresh wrapper each time with the correct state. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f395c706 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f395c706 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f395c706 Branch: refs/heads/trunk Commit: f395c706089bc0e38b2af133fb7bcfbc8d3010f0 Parents: 91a0041 Author: Timothy Bish Authored: Thu Jun 12 19:12:56 2014 -0400 Committer: Timothy Bish Committed: Thu Jun 12 19:12:56 2014 -0400 ---------------------------------------------------------------------- .../activemq/jms/pool/ConnectionPool.java | 48 +++++---- .../apache/activemq/jms/pool/PooledSession.java | 19 ++-- .../activemq/jms/pool/XaConnectionPool.java | 18 ---- .../jms/pool/XaPooledConnectionFactory.java | 18 ++-- .../jms/pool/PooledConnectionTempQueueTest.java | 102 +++++++++++++++++++ 5 files changed, 153 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f395c706/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index eced588..26995ea 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -25,12 +25,12 @@ import javax.jms.Connection; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Holds a real JMS connection along with the session pools associated with it. @@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory; */ public class ConnectionPool { - private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); - protected Connection connection; private int referenceCount; private long lastUsed = System.currentTimeMillis(); @@ -54,7 +52,7 @@ public class ConnectionPool { private boolean useAnonymousProducers = true; private final AtomicBoolean started = new AtomicBoolean(false); - private final GenericKeyedObjectPool sessionPool; + private final GenericKeyedObjectPool sessionPool; private final List loanedSessions = new CopyOnWriteArrayList(); public ConnectionPool(Connection connection) { @@ -62,33 +60,29 @@ public class ConnectionPool { this.connection = wrap(connection); // Create our internal Pool of session instances. - this.sessionPool = new GenericKeyedObjectPool( - new KeyedPoolableObjectFactory() { + this.sessionPool = new GenericKeyedObjectPool( + new KeyedPoolableObjectFactory() { @Override - public void activateObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.add(session); + public void activateObject(SessionKey key, Session session) throws Exception { } @Override - public void destroyObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session); - session.getInternalSession().close(); + public void destroyObject(SessionKey key, Session session) throws Exception { + session.close(); } @Override - public PooledSession makeObject(SessionKey key) throws Exception { - Session session = makeSession(key); - return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers); + public Session makeObject(SessionKey key) throws Exception { + return makeSession(key); } @Override - public void passivateObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session); + public void passivateObject(SessionKey key, Session session) throws Exception { } @Override - public boolean validateObject(SessionKey key, PooledSession session) { + public boolean validateObject(SessionKey key, Session session) { return true; } } @@ -130,7 +124,23 @@ public class ConnectionPool { SessionKey key = new SessionKey(transacted, ackMode); PooledSession session; try { - session = sessionPool.borrowObject(key); + session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers); + session.addSessionEventListener(new PooledSessionEventListener() { + + @Override + public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { + } + + @Override + public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { + } + + @Override + public void onSessionClosed(PooledSession session) { + ConnectionPool.this.loanedSessions.remove(session); + } + }); + this.loanedSessions.add(session); } catch (Exception e) { IllegalStateException illegalStateException = new IllegalStateException(e.toString()); illegalStateException.initCause(e); http://git-wip-us.apache.org/repos/asf/activemq/blob/f395c706/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java index 1d3fc2f..3a2e698 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java @@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool; import java.io.Serializable; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -54,10 +55,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); private final SessionKey key; - private final KeyedObjectPool sessionPool; + private final KeyedObjectPool sessionPool; private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList sessionEventListeners = new CopyOnWriteArrayList(); + private final AtomicBoolean closed = new AtomicBoolean(); private MessageProducer producer; private TopicPublisher publisher; @@ -69,7 +71,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private boolean isXa; private boolean useAnonymousProducers = true; - public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { + public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { this.key = key; this.session = session; this.sessionPool = sessionPool; @@ -94,7 +96,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public void close() throws JMSException { - if (!ignoreClose) { + if (ignoreClose) { + return; + } + + if (closed.compareAndSet(false, true)) { boolean invalidate = false; try { // lets reset the session @@ -140,22 +146,23 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } catch (JMSException e1) { LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); } - session = null; } try { - sessionPool.invalidateObject(key, this); + sessionPool.invalidateObject(key, session); } catch (Exception e) { LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); } } else { try { - sessionPool.returnObject(key, this); + sessionPool.returnObject(key, session); } catch (Exception e) { javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); illegalStateException.initCause(e); throw illegalStateException; } } + + session = null; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f395c706/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java index 4f87153..0f86b55 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java @@ -19,8 +19,6 @@ package org.apache.activemq.jms.pool; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; import javax.jms.XAConnection; import javax.transaction.RollbackException; import javax.transaction.Status; @@ -65,22 +63,6 @@ public class XaConnectionPool extends ConnectionPool { } PooledSession session = (PooledSession) super.createSession(transacted, ackMode); if (isXa) { - session.addSessionEventListener(new PooledSessionEventListener() { - - @Override - public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { - } - - @Override - public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { - } - - @Override - public void onSessionClosed(PooledSession session) { - session.setIgnoreClose(true); - session.setIsXa(false); - } - }); session.setIgnoreClose(true); session.setIsXa(true); transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); http://git-wip-us.apache.org/repos/asf/activemq/blob/f395c706/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java index 0567509..5e44be2 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java @@ -18,14 +18,13 @@ package org.apache.activemq.jms.pool; import java.io.Serializable; import java.util.Hashtable; + import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; -import javax.jms.XAConnectionFactory; import javax.naming.Binding; import javax.naming.Context; import javax.naming.InitialContext; @@ -38,13 +37,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A pooled connection factory that automatically enlists - * sessions in the current active XA transaction if any. + * A pooled connection factory that automatically enlists sessions in the + * current active XA transaction if any. */ -public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, - Serializable, QueueConnectionFactory, TopicConnectionFactory { +public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, Serializable, QueueConnectionFactory, TopicConnectionFactory { private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class); + private static final long serialVersionUID = -6545688026350913005L; + private TransactionManager transactionManager; private boolean tmFromJndi = false; private String tmJndiName = "java:/TransactionManager"; @@ -87,10 +87,10 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/')); try { InitialContext ctx = new InitialContext(); - NamingEnumeration bindings = ctx.listBindings(name); + NamingEnumeration bindings = ctx.listBindings(name); while (bindings.hasMore()) { - Binding bd = (Binding)bindings.next(); + Binding bd = bindings.next(); IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject()); } @@ -116,6 +116,7 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement /** * Allow transaction manager resolution from JNDI (ee deployment) + * * @param tmFromJndi */ public void setTmFromJndi(boolean tmFromJndi) { @@ -141,5 +142,4 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement public TopicConnection createTopicConnection(String userName, String password) throws JMSException { return (TopicConnection) createConnection(userName, password); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/f395c706/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java new file mode 100644 index 0000000..fc0fbf4 --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java @@ -0,0 +1,102 @@ +package org.apache.activemq.jms.pool; + +import java.util.concurrent.Executors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PooledConnectionTempQueueTest { + + private final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempQueueTest.class); + + protected static final String SERVICE_QUEUE = "queue1"; + + @Test + public void testTempQueueIssue() throws JMSException, InterruptedException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + final PooledConnectionFactory cf = new PooledConnectionFactory(); + cf.setConnectionFactory(factory); + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.info("First connection was {}", connection); + + // This order seems to matter to reproduce the issue + connection.close(); + session.close(); + + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + sendWithReplyToTemp(cf, SERVICE_QUEUE); + } + + private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, + InterruptedException { + Connection con = cf.createConnection(); + con.start(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + TextMessage msg = session.createTextMessage("Request"); + msg.setJMSReplyTo(tempQueue); + MessageProducer producer = session.createProducer(session.createQueue(serviceQueue)); + producer.send(msg); + + // This sleep also seems to matter + Thread.sleep(5000); + + MessageConsumer consumer = session.createConsumer(tempQueue); + Message replyMsg = consumer.receive(); + System.out.println(replyMsg.getJMSCorrelationID()); + + consumer.close(); + + producer.close(); + session.close(); + con.close(); + } + + public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, + String queueName) throws JMSException { + Connection con = connectionFactory.createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + final javax.jms.Message inMessage = consumer.receive(); + + String requestMessageId = inMessage.getJMSMessageID(); + System.out.println("Received message " + requestMessageId); + final TextMessage replyMessage = session.createTextMessage("Result"); + replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); + final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); + System.out.println("Sending reply to " + inMessage.getJMSReplyTo()); + producer.send(replyMessage); + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + +}