activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1203213 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/util/DestinationPathSeparatorBroker.java test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Date Thu, 17 Nov 2011 15:10:33 GMT
Author: gtully
Date: Thu Nov 17 15:10:32 2011
New Revision: 1203213

URL: http://svn.apache.org/viewvc?rev=1203213&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3600 - Failover reconnect bypasses DestinationPathSeparatorBroker
generating invalid destinations. fix with test. the messagePull currently does not reference
the destination but to future proof and for consistency it should be escaped

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/DestinationPathSeparatorBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/DestinationPathSeparatorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/DestinationPathSeparatorBroker.java?rev=1203213&r1=1203212&r2=1203213&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/DestinationPathSeparatorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/DestinationPathSeparatorBroker.java
Thu Nov 17 15:10:32 2011
@@ -116,6 +116,18 @@ public class DestinationPathSeparatorBro
         super.removeDestinationInfo(context, info);    
     }
 
+    @Override
+    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl
control) {
+        control.setDestination(convertDestination(control.getDestination()));
+        super.processConsumerControl(consumerExchange, control);
+    }
+
+    @Override
+    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
{
+        pull.setDestination(convertDestination(pull.getDestination()));
+        return super.messagePull(context, pull);
+    }
+
     public void setPathSeparator(String pathSeparator) {
         this.pathSeparator = pathSeparator;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=1203213&r1=1203212&r2=1203213&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Thu Nov 17 15:10:32 2011
@@ -27,6 +27,8 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -59,7 +61,7 @@ import java.util.concurrent.atomic.Atomi
 public class FailoverTransactionTest extends TestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
-    private static final String QUEUE_NAME = "FailoverWithTx";
+    private static final String QUEUE_NAME = "Failover.WithTx";
     private static final String TRANSPORT_URI = "tcp://localhost:0";
     private String url;
     BrokerService broker;
@@ -242,6 +244,112 @@ public class FailoverTransactionTest ext
         connection.close();
     }
 
+    @SuppressWarnings("unchecked")
+    public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception
{
+
+        broker = createBroker(true);
+        setDefaultPersistenceAdapter(broker);
+
+        broker.setPlugins(new BrokerPlugin[]{
+                new DestinationPathSeparatorBroker(),
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context,
+                                                  TransactionId xid, boolean onePhase) throws
Exception {
+                        super.commitTransaction(context, xid, onePhase);
+                        // so commit will hang as if reply is lost
+                        context.setDontSendReponse(true);
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {
+                            public void run() {
+                                LOG.info("Stopping broker post commit...");
+                                try {
+                                    broker.stop();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                    }
+                }
+        });
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        configureConnectionFactory(cf);
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME.replace('.','/') + "?consumer.prefetchSize=0");
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        produceMessage(session, destination);
+
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        // broker will die on commit reply so this will hang till restart
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                LOG.info("doing async commit...");
+                try {
+                    session.commit();
+                } catch (JMSException e) {
+                    assertTrue(e instanceof TransactionRolledBackException);
+                    LOG.info("got commit exception: ", e);
+                }
+                commitDoneLatch.countDown();
+                LOG.info("done async commit");
+            }
+        });
+
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false, url);
+        setDefaultPersistenceAdapter(broker);
+        broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
+        broker.start();
+
+        assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+
+        // new transaction
+        Message msg = consumer.receive(20000);
+        LOG.info("Received: " + msg);
+        assertNotNull("we got the message", msg);
+        assertNull("we got just one message", consumer.receive(2000));
+        session.commit();
+        consumer.close();
+        connection.close();
+
+        // ensure no dangling messages with fresh broker etc
+        broker.stop();
+        broker.waitUntilStopped();
+
+        LOG.info("Checking for remaining/hung messages..");
+        broker = createBroker(false, url);
+        setDefaultPersistenceAdapter(broker);
+        broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
+        broker.start();
+
+        // after restart, ensure no dangling messages
+        cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
+        connection = cf.createConnection();
+        connection.start();
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session2.createConsumer(destination);
+        msg = consumer.receive(1000);
+        if (msg == null) {
+            msg = consumer.receive(5000);
+        }
+        LOG.info("Received: " + msg);
+        assertNull("no messges left dangling but got: " + msg, msg);
+        connection.close();
+
+        ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations();
+        for (ActiveMQDestination dest : destinations) {
+            LOG.info("Destinations list: " + dest);
+        }
+        assertEquals("Only one destination", 1, broker.getRegionBroker().getDestinations().length);
+    }
+
     public void initCombosForTestFailoverSendReplyLost() {
         addCombinationValues("defaultPersistenceAdapter",
             new Object[]{PersistenceAdapterChoice.KahaDB,



Mime
View raw message