activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r739307 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/PrefetchSubscription.java test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java test/java/org/apache/activemq/network/SimpleNetworkTest.java
Date Fri, 30 Jan 2009 15:30:25 GMT
Author: gtully
Date: Fri Jan 30 15:30:24 2009
New Revision: 739307

URL: http://svn.apache.org/viewvc?rev=739307&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1593

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Jan 30 15:30:24 2009
@@ -316,7 +316,6 @@
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        node.incrementRedeliveryCounter();
                         if (ack.getLastMessageId().equals(messageId)) {
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
Fri Jan 30 15:30:24 2009
@@ -70,9 +70,6 @@
     
     public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception
{
 
-        final int nbMessages = 10;
-        final String destinationName = "Destination";
-
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
         
         Connection connection = connectionFactory.createConnection();
@@ -179,36 +176,66 @@
         }
     }
     
-    public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
+    // AMQ-1593
+    public void testValidateRedeliveryCountOnRollback() throws Exception {
 
-        ConnectionFactory connectionFactory = 
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        populateDestination(nbMessages, destinationName, connection);
+        populateDestination(numMessages, destinationName, connection);
 
         {
             AtomicInteger received = new AtomicInteger();
-            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
-            while (received.get() < nbMessages) {
-                Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                 Destination destination = session.createQueue(destinationName);
 
                 MessageConsumer consumer = session.createConsumer(destination);         
  
                 TextMessage msg = (TextMessage) consumer.receive(1000);
                 if (msg != null) {
-                    if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE)
!= null) {
-                        LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement()
+ ")" + msg.getJMSMessageID());
-                        assertTrue(msg.getJMSRedelivered());
-                        session.commit();
-                    }
+                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement()
+ ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", received.get(),
msg.getLongProperty("JMSXDeliveryCount"));
+                    session.rollback();
                 }
                 session.close();
             }
         }
     }
     
+    // AMQ-1593
+    public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
+
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(numMessages, destinationName, connection);
+
+        {
+            AtomicInteger received = new AtomicInteger();
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = session.createConsumer(destination);         
  
+                TextMessage msg = (TextMessage) consumer.receive(1000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement()
+ ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", received.get(),
msg.getLongProperty("JMSXDeliveryCount"));
+                    session.rollback();
+                }
+                session.close();
+            }
+        }
+    }
+
     public void testRedeliveryPropertyWithNoRollback() throws Exception {
         ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Fri Jan 30 15:30:24 2009
@@ -140,7 +140,7 @@
         doSetUp();
         remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            assertNotNull(remoteConsumer.receive(500));
+            assertNotNull("message count: " + i, remoteConsumer.receive(1000));
         }
     }
 



Mime
View raw message