activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r953696 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQConnection.java test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Date Fri, 11 Jun 2010 13:40:49 GMT
Author: gtully
Date: Fri Jun 11 13:40:49 2010
New Revision: 953696

URL: http://svn.apache.org/viewvc?rev=953696&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2772 - patch applied with thanks and
new test case created to protect the change

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=953696&r1=953695&r2=953696&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Jun 11 13:40:49 2010
@@ -1852,7 +1852,7 @@ public class ActiveMQConnection implemen
 	}
 
     public void transportInterupted() {
-        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size()
- (advisoryConsumer != null ? 1:0));
+        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size()
- (advisoryConsumer != null ? 1:0) - connectionConsumers.size());
         if (LOG.isDebugEnabled()) {
             LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=953696&r1=953695&r2=953696&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Fri Jun 11 13:40:49 2010
@@ -36,6 +36,8 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
@@ -290,7 +292,59 @@ public class FailoverTransactionTest {
 	    session.commit();
 	    connection.close();
 	}
-		
+	
+    @Test
+    // https://issues.apache.org/activemq/browse/AMQ-2772
+    public void testFailoverWithConnectionConsumer() throws Exception {
+        startCleanBroker();         
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME);
+
+        final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
+        final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
+            public ServerSession getServerSession() throws JMSException {
+                return new ServerSession() {
+                    public Session getSession() throws JMSException {
+                        return poolSession;
+                    }
+                    public void start() throws JMSException {
+                        connectionConsumerGotOne.countDown();
+                        poolSession.run();
+                    }
+                };
+            }
+        }, 1);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer;
+        TextMessage message;
+        final int count = 10;
+        for (int i=0; i<count; i++) {
+            producer = session.createProducer(destination);         
+            message = session.createTextMessage("Test message: " + count);
+            producer.send(message);
+            producer.close();
+        }
+        
+        // restart to force failover and connection state recovery before the commit
+        broker.stop();
+        startBroker(false);
+        
+        session.commit();
+        for (int i=0; i<count-1; i++) {
+            assertNotNull("we got all the message: " + count, consumer.receive(20000));
+        }
+        session.commit();
+        connection.close();
+        
+        assertTrue("connectionconsumer got a message", connectionConsumerGotOne.await(10,
TimeUnit.SECONDS));
+    }
+	
     @Test
     public void testFailoverConsumerAckLost() throws Exception {
         // as failure depends on hash order of state tracker recovery, do a few times



Mime
View raw message