qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r581186 - in /incubator/qpid/branches/M2: ./ java/broker/src/main/java/org/apache/qpid/server/ack/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/client/src/test/java/org/apache/qpid/test/unit/topic/ java/client/src/test/java/...
Date Tue, 02 Oct 2007 10:37:54 GMT
Author: ritchiem
Date: Tue Oct  2 03:37:52 2007
New Revision: 581186

URL: http://svn.apache.org/viewvc?rev=581186&view=rev
Log:
Merged revisions 580992-580993 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r580992 | ritchiem | 2007-10-01 16:27:54 +0100 (Mon, 01 Oct 2007) | 1 line
  
  QPID-595 CommitRollbackTest Patch provided by Aidan Skinner to address intermittent test
failure.
........
  r580993 | ritchiem | 2007-10-01 16:31:00 +0100 (Mon, 01 Oct 2007) | 1 line
  
  QPID-611 QPID-620. DurableSubscriptionTest was failing due to a race condition when using
NO_ACK. This is due to the Queue Total Size being updated after the send, but after the send
and NO_ACK the msg data is purged and so there is no size to retrieve. Changed all references
to msg.dequeue to queue.dequeue where appropriate so we can use that single point in the future
for updating the Queue Total Size.
........

Modified:
    incubator/qpid/branches/M2/   (props changed)
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Oct  2 03:37:52 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-580941,580985,581002
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-580941,580985,580992-580993,581002

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?rev=581186&r1=581185&r2=581186&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
Tue Oct  2 03:37:52 2007
@@ -60,7 +60,7 @@
     {
         if (queue != null)
         {
-            message.dequeue(storeContext, queue);
+            queue.dequeue(storeContext, message);
         }
         //if the queue is null then the message is waiting to be acked, but has been removed.
         message.decrementReference(storeContext);

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=581186&r1=581185&r2=581186&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Tue Oct  2 03:37:52 2007
@@ -592,7 +592,19 @@
         _transientMessageData.addDestinationQueue(queue);
     }
 
-    public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
+    /**
+     * NOTE: Think about why you are using this method. Normal usages would want to do
+     * AMQQueue.dequeue(StoreContext, AMQMessage)
+     * This will keep the queue statistics up-to-date.
+     * Currently this method is only called _correctly_ from AMQQueue dequeue.
+     * Ideally we would have a better way for the queue to dequeue the message.
+     * Especially since enqueue isn't the recipriocal of this method.
+     * @deprecated
+     * @param storeContext
+     * @param queue
+     * @throws AMQException
+     */
+    void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
     {
         _messageHandle.dequeue(storeContext, _messageId, queue);
     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=581186&r1=581185&r2=581186&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Oct  2 03:37:52 2007
@@ -809,7 +809,7 @@
         }
     }
 
-    void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+    public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
     {
         try
         {

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=581186&r1=581185&r2=581186&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Tue Oct  2 03:37:52 2007
@@ -413,14 +413,16 @@
 
         AMQMessage message = _messages.poll();
 
-        message.dequeue(storeContext, queue);
-
-        message.decrementReference(storeContext);
-
         if (message != null)
         {
+            queue.dequeue(storeContext, message);
+
             _totalMessageSize.addAndGet(-message.getSize());
-        }
+
+            //If this causes ref count to hit zero then data will be purged so message.getSize()
will NPE.
+            message.decrementReference(storeContext);
+
+        }        
 
         _lock.unlock();
     }
@@ -485,7 +487,7 @@
                 _totalMessageSize.addAndGet(-message.getSize());
 
                 // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
-                message.dequeue(_reapingStoreContext, _queue);
+                _queue.dequeue(_reapingStoreContext, message);
 
                 message.decrementReference(_reapingStoreContext);
 
@@ -511,13 +513,16 @@
     }
 
     /**
-     *  This method will return true if the message is to be purged from the queue.
+     * This method will return true if the message is to be purged from the queue.
      *
      *
-     *  SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+     * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+     *
      * @param message
      * @param sub
+     *
      * @return
+     *
      * @throws AMQException
      */
     private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
@@ -607,6 +612,12 @@
                                ") to :" + System.identityHashCode(sub));
                 }
 
+
+                if (messageQueue == _messages)
+                {
+                    _totalMessageSize.addAndGet(-message.getSize());
+                }
+
                 sub.send(message, _queue);
 
                 //remove sent message from our queue.
@@ -654,10 +665,6 @@
                 }
             }
 
-            if ((message != null) && (messageQueue == _messages))
-            {
-                _totalMessageSize.addAndGet(-message.getSize());
-            }
         }
         catch (AMQException e)
         {

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=581186&r1=581185&r2=581186&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
(original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Tue Oct  2 03:37:52 2007
@@ -114,18 +114,28 @@
         con.close();
     }
 
-    public void testDurability() throws AMQException, JMSException, URLSyntaxException
+    public void testDurabilityNOACK() throws AMQException, JMSException, URLSyntaxException
+    {
+        durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+    }
+
+    public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException
+    {
+        durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
     {
 
         AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
         AMQTopic topic = new AMQTopic(con, "MyTopic");
-        Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session1 = con.createSession(false, ackMode);
         MessageConsumer consumer1 = session1.createConsumer(topic);
 
-        Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session sessionProd = con.createSession(false, ackMode);
         MessageProducer producer = sessionProd.createProducer(topic);
 
-        Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session2 = con.createSession(false, ackMode);
         TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
 
         con.start();
@@ -133,36 +143,41 @@
         producer.send(session1.createTextMessage("A"));
 
         Message msg;
-        msg = consumer1.receive();
-        assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer1.receive(100);
-        assertEquals(null, msg);
+        assertNotNull("Message should be available", msg);
+        assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+
+        msg = consumer1.receive(100);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
 
         msg = consumer2.receive();
-        assertEquals("A", ((TextMessage) msg).getText());
+        assertNotNull(msg);
+        assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage)
msg).getText());
         msg = consumer2.receive(100);
-        assertEquals(null, msg);
+        assertNull("There should be no more messages for consumption on consumer2.", msg);
 
         consumer2.close();
 
-        Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session3 = con.createSession(false, ackMode);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
         msg = consumer1.receive(100);
-        assertEquals("B", ((TextMessage) msg).getText());
+        assertNotNull("Consumer 1 should get message 'B'.", msg);
+        assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(100);
-        assertEquals(null, msg);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
 
         _logger.info("Receive message on consumer 3 :expecting B");
         msg = consumer3.receive(100);
-        assertEquals("B", ((TextMessage) msg).getText());
+        assertNotNull("Consumer 3 should get message 'B'.", msg);
+        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
         msg = consumer3.receive(100);
-        assertEquals(null, msg);
+        assertNull("There should be no more messages for consumption on consumer3.", msg);
 
         con.close();
     }

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=581186&r1=581185&r2=581186&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
(original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Tue Oct  2 03:37:52 2007
@@ -407,7 +407,6 @@
         {
             _logger.info("Got 1 redelivered");
             assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
-            assertFalse("Already received message one", _gotone);
             _gotone = true;
 
         }
@@ -418,15 +417,15 @@
             if (result.getJMSRedelivered())
             {
                 _logger.info("Got 2 redelivered, message was prefetched");
-                assertFalse("Already received message redelivered two", _gottwoRedelivered);
-
                 _gottwoRedelivered = true;
+                
             }
             else
             {
                 _logger.warn("Got 2, message prefetched wasn't cleared or messages was in
transit when rollback occured");                
                 assertFalse("Already received message two", _gottwo);
-
+                assertFalse("Already received message redelivered two", _gottwoRedelivered);
+                
                 _gottwo = true;
             }
         }



Mime
View raw message