activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1425162 - in /activemq/trunk/activemq-pool/src: main/java/org/apache/activemq/pool/ test/java/org/apache/activemq/pool/
Date Fri, 21 Dec 2012 22:05:49 GMT
Author: tabish
Date: Fri Dec 21 22:05:48 2012
New Revision: 1425162

URL: http://svn.apache.org/viewvc?rev=1425162&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4225

Added:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
  (with props)
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1425162&r1=1425161&r2=1425162&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
Fri Dec 21 22:05:48 2012
@@ -62,6 +62,7 @@ public class PooledConnection implements
     private volatile boolean stopped;
     private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
     private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
 
     /**
      * Creates a new PooledConnection instance that uses the given ConnectionPool to create
@@ -86,6 +87,7 @@ public class PooledConnection implements
     @Override
     public void close() throws JMSException {
         this.cleanupConnectionTemporaryDestinations();
+        this.cleanupAllLoanedSessions();
         if (this.pool != null) {
             this.pool.decrementReferenceCount();
             this.pool = null;
@@ -104,8 +106,7 @@ public class PooledConnection implements
     }
 
     @Override
-    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector,
ServerSessionPool serverSessionPool, int maxMessages)
-            throws JMSException {
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector,
ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(destination, selector, serverSessionPool,
maxMessages);
     }
 
@@ -115,8 +116,7 @@ public class PooledConnection implements
     }
 
     @Override
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector,
String s1, ServerSessionPool serverSessionPool, int i)
-            throws JMSException {
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector,
String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
         return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool,
i);
     }
 
@@ -173,10 +173,14 @@ public class PooledConnection implements
         PooledSession result;
         result = (PooledSession) pool.createSession(transacted, ackMode);
 
-        // Add a temporary destination event listener to the session that notifies us when
-        // the session creates temporary destinations.
-        result.addTempDestEventListener(this);
-        return (Session) result;
+        // Store the session so we can close the sessions that this PooledConnection
+        // created in order to ensure that consumers etc are closed per the JMS contract.
+        loanedSessions.add(result);
+
+        // Add a event listener to the session that notifies us when the session
+        // creates / destroys temporary destinations and closes etc.
+        result.addSessionEventListener(this);
+        return result;
     }
 
     // EnhancedCollection API
@@ -190,14 +194,23 @@ public class PooledConnection implements
     // Implementation methods
     // -------------------------------------------------------------------------
 
+    @Override
     public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
         connTempQueues.add(tempQueue);
     }
 
+    @Override
     public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
         connTempTopics.add(tempTopic);
     }
 
+    @Override
+    public void onSessionClosed(PooledSession session) {
+        if (session != null) {
+            this.loanedSessions.remove(session);
+        }
+    }
+
     public ActiveMQConnection getConnection() throws JMSException {
         assertNotClosed();
         return pool.getConnection();
@@ -213,6 +226,7 @@ public class PooledConnection implements
         return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
     }
 
+    @Override
     public String toString() {
         return "PooledConnection { " + pool + " }";
     }
@@ -247,6 +261,23 @@ public class PooledConnection implements
     }
 
     /**
+     * The PooledSession tracks all Sessions that it created and now we close them.  Closing
the
+     * PooledSession will return the internal Session to the Pool of Session after cleaning
up
+     * all the resources that the Session had allocated for this PooledConnection.
+     */
+    protected void cleanupAllLoanedSessions() {
+
+        for (PooledSession session : loanedSessions) {
+            try {
+                session.close();
+            } catch (JMSException ex) {
+                LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled
connection: " + ex.getMessage());
+            }
+        }
+        loanedSessions.clear();
+    }
+
+    /**
      * @return the total number of Pooled session including idle sessions that are not
      *          currently loaned out to any client.
      */

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1425162&r1=1425161&r2=1425162&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
Fri Dec 21 22:05:48 2012
@@ -63,7 +63,7 @@ public class PooledSession implements Se
     private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners
=
+    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners
=
         new CopyOnWriteArrayList<PooledSessionEventListener>();
 
     private ActiveMQSession session;
@@ -81,10 +81,10 @@ public class PooledSession implements Se
         this.transactional = session.isTransacted();
     }
 
-    public void addTempDestEventListener(PooledSessionEventListener listener) {
+    public void addSessionEventListener(PooledSessionEventListener listener) {
         // only add if really needed
-        if (!tempDestEventListeners.contains(listener)) {
-            this.tempDestEventListeners.add(listener);
+        if (!sessionEventListeners.contains(listener)) {
+            this.sessionEventListeners.add(listener);
         }
     }
 
@@ -129,7 +129,10 @@ public class PooledSession implements Se
             } finally {
                 consumers.clear();
                 browsers.clear();
-                tempDestEventListeners.clear();
+                for (PooledSessionEventListener listener : this.sessionEventListeners) {
+                    listener.onSessionClosed(this);
+                }
+                sessionEventListeners.clear();
             }
 
             if (invalidate) {
@@ -205,7 +208,7 @@ public class PooledSession implements Se
         result = getInternalSession().createTemporaryQueue();
 
         // Notify all of the listeners of the created temporary Queue.
-        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
             listener.onTemporaryQueueCreate(result);
         }
 
@@ -219,7 +222,7 @@ public class PooledSession implements Se
         result = getInternalSession().createTemporaryTopic();
 
         // Notify all of the listeners of the created temporary Topic.
-        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
             listener.onTemporaryTopicCreate(result);
         }
 

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java?rev=1425162&r1=1425161&r2=1425162&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
Fri Dec 21 22:05:48 2012
@@ -26,7 +26,7 @@ interface PooledSessionEventListener {
      * Called on successful creation of a new TemporaryQueue.
      *
      * @param tempQueue
-     *            The TemporaryQueue just created.
+     *      The TemporaryQueue just created.
      */
     void onTemporaryQueueCreate(TemporaryQueue tempQueue);
 
@@ -34,8 +34,16 @@ interface PooledSessionEventListener {
      * Called on successful creation of a new TemporaryTopic.
      *
      * @param tempTopic
-     *            The TemporaryTopic just created.
+     *      The TemporaryTopic just created.
      */
     void onTemporaryTopicCreate(TemporaryTopic tempTopic);
 
+    /**
+     * Called when the PooledSession is closed.
+     *
+     * @param session
+     *      The PooledSession that has been closed.
+     */
+    void onSessionClosed(PooledSession session);
+
 }

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java?rev=1425162&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
(added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
Fri Dec 21 22:05:48 2012
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.pool;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PooledConnectionSessionCleanupTest {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionSessionCleanupTest.class);
+
+    protected BrokerService service;
+
+    protected ActiveMQConnectionFactory directConnFact;
+    protected Connection directConn1;
+    protected Connection directConn2;
+
+    protected PooledConnectionFactory pooledConnFact;
+    protected Connection pooledConn1;
+    protected Connection pooledConn2;
+
+    private final ActiveMQQueue queue = new ActiveMQQueue("ContendedQueue");
+    private final int MESSAGE_COUNT = 50;
+
+    /**
+     * Prepare to run a test case: create, configure, and start the embedded
+     * broker, as well as creating the client connections to the broker.
+     */
+    @Before
+    public void prepTest() throws java.lang.Exception {
+        service = new BrokerService();
+        service.setUseJmx(true);
+        service.setPersistent(false);
+        service.setSchedulerSupport(false);
+        service.start();
+        service.waitUntilStarted();
+
+        // Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
+        // Set a long idle timeout on the pooled connections to better show the
+        // problem of holding onto created resources on close.
+        directConnFact = new ActiveMQConnectionFactory(service.getVmConnectorURI());
+        pooledConnFact = new PooledConnectionFactory(directConnFact);
+        pooledConnFact.setIdleTimeout((int)TimeUnit.MINUTES.toMillis(60));
+        pooledConnFact.setMaxConnections(1);
+
+        // Prepare the connections
+        directConn1 = directConnFact.createConnection();
+        directConn1.start();
+        directConn2 = directConnFact.createConnection();
+        directConn2.start();
+
+        // The pooled Connections should have the same underlying connection
+        pooledConn1 = pooledConnFact.createConnection();
+        pooledConn1.start();
+        pooledConn2 = pooledConnFact.createConnection();
+        pooledConn2.start();
+    }
+
+    @After
+    public void cleanupTest() throws java.lang.Exception {
+        try {
+            pooledConn1.close();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            pooledConn2.close();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            directConn1.close();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            directConn2.close();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            service.stop();
+        } catch (JMSException jms_exc) {
+        }
+    }
+
+    private void produceMessages() throws Exception {
+
+        Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            producer.send(session.createTextMessage("Test Message: " + i));
+        }
+        producer.close();
+    }
+
+    private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":Type=Queue,Destination=" + name
+                + ",BrokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) service.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    @Test
+    public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws Exception {
+
+        produceMessages();
+
+        Session pooledSession1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        pooledSession1.createConsumer(queue);
+
+        final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
+
+        assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getInFlightCount() == MESSAGE_COUNT;
+            }
+        }));
+
+        // While all the message are in flight we should get anything on this consumer.
+        Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        assertNull(consumer.receive(2000));
+
+        pooledConn1.close();
+
+        assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getSubscriptions().length == 1;
+            }
+        }));
+
+        // Now we'd expect that the message stuck in the prefetch of the pooled session's
+        // consumer would be rerouted to the non-pooled session's consumer.
+        assertNotNull(consumer.receive(10000));
+    }
+
+    @Test
+    public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages() throws Exception
{
+
+        produceMessages();
+
+        Session directSession = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        directSession.createConsumer(queue);
+
+        final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
+
+        assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getInFlightCount() == MESSAGE_COUNT;
+            }
+        }));
+
+        // While all the message are in flight we should get anything on this consumer.
+        Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        assertNull(consumer.receive(2000));
+
+        directConn2.close();
+
+        assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getSubscriptions().length == 1;
+            }
+        }));
+
+        // Now we'd expect that the message stuck in the prefetch of the first session's
+        // consumer would be rerouted to the alternate session's consumer.
+        assertNotNull(consumer.receive(10000));
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message