activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cepo...@apache.org
Subject svn commit: r1421994 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/bugs/
Date Fri, 14 Dec 2012 17:42:47 GMT
Author: ceposta
Date: Fri Dec 14 17:42:45 2012
New Revision: 1421994

URL: http://svn.apache.org/viewvc?rev=1421994&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-4222

ProducerBrokerExchange holds reference to "region" destination after message has been sent

Can cause leak with temp dests

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1421994&r1=1421993&r2=1421994&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Dec 14 17:42:45 2012
@@ -473,10 +473,10 @@ public class RegionBroker extends EmptyB
 
     @Override
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception
{
+        ActiveMQDestination destination = message.getDestination();
         message.setBrokerInTime(System.currentTimeMillis());
         if (producerExchange.isMutable() || producerExchange.getRegion() == null
                 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed()))
{
-            ActiveMQDestination destination = message.getDestination();
             // ensure the destination is registered with the RegionBroker
             producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(),
destination, isAllowTempAutoCreationOnSend());
             Region region;
@@ -501,6 +501,13 @@ public class RegionBroker extends EmptyB
         }
 
         producerExchange.getRegion().send(producerExchange, message);
+
+        // clean up so these references aren't kept (possible leak) in the producer exchange
+        // especially since temps are transitory
+        if (destination.isTemporary()) {
+            producerExchange.setRegionDestination(null);
+            producerExchange.setRegion(null);
+        }
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=1421994&r1=1421993&r2=1421994&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java Fri Dec
14 17:42:45 2012
@@ -170,6 +170,10 @@ public abstract class TestSupport extend
     private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination>
getDestinationMap(BrokerService target,
             ActiveMQDestination destination) {
         RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+        if (destination.isTemporary()) {
+            return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap()
:
+                    regionBroker.getTempTopicRegion().getDestinationMap();
+        }
         return destination.isQueue() ?
                     regionBroker.getQueueRegion().getDestinationMap() :
                         regionBroker.getTopicRegion().getDestinationMap();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java?rev=1421994&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java Fri
Dec 14 17:42:45 2012
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.transport.vm.VMTransportFactory;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ */
+public class AMQ4222Test extends TestSupport {
+
+    protected BrokerService brokerService;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        topic = false;
+        brokerService = createBroker();
+
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    public void testTempQueueCleanedUp() throws Exception {
+
+        Destination requestQueue = createDestination();
+
+        Connection producerConnection = createConnection();
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = producerSession.createProducer(requestQueue);
+        Destination replyTo = producerSession.createTemporaryQueue();
+        MessageConsumer producerSessionConsumer = producerSession.createConsumer(replyTo);
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        // let's listen to the response on the queue
+        producerSessionConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    if (message instanceof TextMessage) {
+                        System.out.println("You got a message: " + ((TextMessage) message).getText());
+                        countDownLatch.countDown();
+                    }
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        producer.send(createRequest(producerSession, replyTo));
+
+        Connection consumerConnection = createConnection();
+        consumerConnection.start();
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(requestQueue);
+        final MessageProducer consumerProducer = consumerSession.createProducer(null);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    consumerProducer.send(message.getJMSReplyTo(), message);
+                } catch (JMSException e) {
+                    System.out.println("error sending a response on the temp queue");
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        countDownLatch.await(2, TimeUnit.SECONDS);
+
+        // producer has not gone away yet...
+        org.apache.activemq.broker.region.Destination tempDestination = getDestination(brokerService,
+                (ActiveMQDestination) replyTo);
+        assertNotNull(tempDestination);
+
+        // clean up
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+        // producer has gone away.. so the temp queue should not exist anymore... let's see..
+        // producer has not gone away yet...
+        tempDestination = getDestination(brokerService,
+                (ActiveMQDestination) replyTo);
+        assertNull(tempDestination);
+
+        // now.. the connection on the broker side for the dude producing to the temp dest
will
+        // still have a reference in his producerBrokerExchange.. this will keep the destination
+        // from being reclaimed by GC if there is never another send that producer makes...
+        // let's see if that reference is there...
+        TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
+        assertNotNull(connector);
+        assertEquals(1, connector.getConnections().size());
+        TransportConnection transportConnection = connector.getConnections().get(0);
+        Map<ProducerId, ProducerBrokerExchange> exchanges = getProducerExchangeFromConn(transportConnection);
+        assertEquals(1, exchanges.size());
+        ProducerBrokerExchange exchange = exchanges.values().iterator().next();
+
+        // so this is the reason for the test... we don't want these exchanges to hold a
reference
+        // to a region destination.. after a send is completed, the destination is not used
anymore on
+        // a producer exchange
+        assertNull(exchange.getRegionDestination());
+        assertNull(exchange.getRegion());
+
+    }
+
+    private Map<ProducerId, ProducerBrokerExchange> getProducerExchangeFromConn(TransportConnection
transportConnection) throws NoSuchFieldException, IllegalAccessException {
+        Field f = TransportConnection.class.getDeclaredField("producerExchanges");
+        f.setAccessible(true);
+        Map<ProducerId, ProducerBrokerExchange> producerExchanges =
+                (Map<ProducerId, ProducerBrokerExchange>)f.get(transportConnection);
+        return producerExchanges;
+    }
+
+
+    private Message createRequest(Session session, Destination replyTo) throws JMSException
{
+        Message message = session.createTextMessage("Payload");
+        message.setJMSReplyTo(replyTo);
+        return message;
+    }
+
+}



Mime
View raw message