activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r739249 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/ test/java/org/apache/activemq/test/rollback/
Date Fri, 30 Jan 2009 11:57:07 GMT
Author: gtully
Date: Fri Jan 30 11:57:06 2009
New Revision: 739249

URL: http://svn.apache.org/viewvc?rev=739249&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-1730 - add some more tests and remove
workaround for prefetch=0, relates to https://issues.apache.org/activemq/browse/AMQ-2087

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Jan 30 11:57:06 2009
@@ -230,8 +230,6 @@
 
                                         public void afterRollback() throws Exception {
                                             synchronized(dispatchLock) {
-                                            	// ActiveMQ workaround for AMQ-1730 - Please
Ignore next line
-                                                node.incrementRedeliveryCounter();
                                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                                             }
                                         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri
Jan 30 11:57:06 2009
@@ -630,10 +630,11 @@
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
         
-        sendMessages(connection, destination, 1);
+        sendMessages(connection, destination, 2);
         
         MessageConsumer consumer = session.createConsumer(destination);
         assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
         
         // install another consumer while message dispatch is unacked/uncommitted
         Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -645,8 +646,12 @@
         Message msg = redispatchConsumer.receive(1000);
         assertNotNull(msg);
         assertTrue(msg.getJMSRedelivered());
-        // should have re-delivery of 2, one for re-dispatch, one for rollback which is a
little too much!
-        assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        
+        msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
         redispatchSession.commit();
         
         assertNull(redispatchConsumer.receive(500));
@@ -660,10 +665,11 @@
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
         
-        sendMessages(connection, destination, 1);
+        sendMessages(connection, destination, 2);
         
         MessageConsumer consumer = session.createConsumer(destination);
         assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
         
         // install another consumer while message dispatch is unacked/uncommitted
         Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -675,8 +681,11 @@
         Message msg = redispatchConsumer.receive(1000);
         assertNotNull(msg);
         assertTrue(msg.getJMSRedelivered());
-        // should have re-delivery of 2, one for re-dispatch, one for rollback which is a
little too much!
-        assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
         redispatchSession.commit();
         
         assertNull(redispatchConsumer.receive(500));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
Fri Jan 30 11:57:06 2009
@@ -38,6 +38,7 @@
     protected static final Log LOG = LogFactory.getLog(JmsRollbackRedeliveryTest.class);
     final int nbMessages = 10;
     final String destinationName = "Destination";
+    final String brokerUrl = "vm://localhost?create=false";
     boolean consumerClose = true;
     boolean rollback = true;
     
@@ -52,13 +53,21 @@
     
 
     public void testRedelivery() throws Exception {
-        doTestRedelivery("vm://localhost", false);
+        doTestRedelivery(brokerUrl, false);
     }
 
     public void testRedeliveryWithInterleavedProducer() throws Exception {
-        doTestRedelivery("vm://localhost", true);
+        doTestRedelivery(brokerUrl, true);
     }
 
+    public void testRedeliveryWithPrefetch0() throws Exception {
+        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0", true);
+    }
+    
+    public void testRedeliveryWithPrefetch1() throws Exception {
+        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1", true);
+    }
+    
     public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception
{
 
         final int nbMessages = 10;
@@ -88,9 +97,11 @@
                     if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE)
!= null) {
                         LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement()
+ ")" + msg.getJMSMessageID());
                         assertTrue(msg.getJMSRedelivered());
+                        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
                         session.commit();
                     } else {
                         LOG.info("Rollback message " + msg.getText() + " id: " +  msg.getJMSMessageID());
+                        assertFalse(msg.getJMSRedelivered());
                         session.rollback();
                     }
                 }
@@ -102,7 +113,8 @@
        
     public void testRedeliveryOnSingleConsumer() throws Exception {
 
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
@@ -135,7 +147,8 @@
     
     public void testRedeliveryOnSingleSession() throws Exception {
 
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl); 
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
@@ -168,7 +181,8 @@
     
     public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
 
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
@@ -195,6 +209,37 @@
         }
     }
     
+    public void testRedeliveryPropertyWithNoRollback() throws Exception {
+        ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(nbMessages, destinationName, connection);
+        connection.close();
+        
+        {
+            AtomicInteger received = new AtomicInteger();
+            while (received.get() < nbMessages) {
+                connection = connectionFactory.createConnection();
+                connection.start();
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = session.createConsumer(destination);         
  
+                TextMessage msg = (TextMessage) consumer.receive(2000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + 
+                            " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertFalse(msg.getJMSRedelivered());
+                    assertEquals(1, msg.getLongProperty("JMSXDeliveryCount"));
+                }
+                session.close();
+                connection.close();
+            }
+        }
+    }
+    
     private void populateDestination(final int nbMessages,
             final String destinationName, Connection connection)
             throws JMSException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=739249&r1=739248&r2=739249&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
Fri Jan 30 11:57:06 2009
@@ -55,7 +55,7 @@
         session.commit();
         assertNotNull(message);
         assertEquals("redelivered message", id, message.getJMSMessageID());
-        assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+        assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
     }
     
     public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws
Throwable {
@@ -75,7 +75,7 @@
         session.commit();
         assertNotNull(message);
         assertEquals("redelivered message", id, message.getJMSMessageID());
-        assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+        assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
     }
 
     public void testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws
Throwable {
@@ -94,7 +94,7 @@
         session.commit();
         assertNotNull(message);
         assertEquals("redelivered message", id, message.getJMSMessageID());
-        assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+        assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
     }
     
     protected void setUp() throws Exception {



Mime
View raw message