qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject svn commit: r1721151 - in /qpid/java/trunk: client/src/main/java/org/apache/qpid/client/ systests/src/test/java/org/apache/qpid/test/unit/close/
Date Mon, 21 Dec 2015 13:33:04 GMT
Author: orudyy
Date: Mon Dec 21 13:33:03 2015
New Revision: 1721151

URL: http://svn.apache.org/viewvc?rev=1721151&view=rev
Log:
QPID-6951: Release consumer prefetched messages on consumer close regardless whether session
is closed or not. Rename/remove consumer methods to have sensible method names indicating
what exactly method is really doing. Add system test

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Dec 21
13:33:03 2015
@@ -27,7 +27,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -939,7 +938,7 @@ public abstract class AMQSession<C exten
     private void rejectPending(C consumer)
     {
         // Reject messages on pre-receive queue
-        consumer.rollbackPendingMessages();
+        consumer.releasePendingMessages();
 
         // Reject messages on pre-dispatch queue
         rejectMessagesForConsumerTag(consumer.getConsumerTag());
@@ -3373,7 +3372,7 @@ public abstract class AMQSession<C exten
                 {
                     if (!consumer.isBrowseOnly())
                     {
-                        consumer.rollback();
+                        consumer.releasePendingMessages();
                     }
                     else
                     {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Dec 21 13:33:03 2015
@@ -652,9 +652,9 @@ public abstract class BasicMessageConsum
             }
 
 
-            if(!(isBrowseOnly() || getSession().isClosing()))
+            if(!isBrowseOnly())
             {
-                rollback();
+                releasePendingMessages();
             }
         }
     }
@@ -887,12 +887,7 @@ public abstract class BasicMessageConsum
         return _browseOnly;
     }
 
-    public void rollback()
-    {
-        rollbackPendingMessages();
-    }
-
-    public void rollbackPendingMessages()
+    void releasePendingMessages()
     {
         if (_synchronousQueue.size() > 0)
         {
@@ -942,7 +937,7 @@ public abstract class BasicMessageConsum
             if (_synchronousQueue.size() != 0)
             {
                 _logger.warn("Queue was not empty after rejecting all messages Remaining:"
+ _synchronousQueue.size());
-                rollback();
+                releasePendingMessages();
             }
 
             clearReceiveQueue();

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Dec 21 13:33:03 2015
@@ -454,7 +454,7 @@ public class BasicMessageConsumer_0_10 e
         return receiveNoWait();
     }
 
-    @Override public void rollbackPendingMessages()
+    @Override void releasePendingMessages()
     {
         if (getSynchronousQueue().size() > 0)
         {

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
(original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Mon Dec 21 13:33:03 2015
@@ -108,4 +108,39 @@ public class MessageConsumerCloseTest  e
         assertEquals("Message three has unexpected content", 2, msg3.getIntProperty(INDEX));
         session.commit();
     }
+
+    public void testMessagesReceivedBeforeConsumerCloseAreRedeliveredAfterRollback() throws
Exception
+    {
+        Connection connection = getConnection();
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        Destination destination = getTestQueue();
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        int messageNumber = 4;
+        connection.start();
+        sendMessage(session, destination, messageNumber);
+
+        for(int i = 0; i < messageNumber/2 ; i++)
+        {
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message [" + i +"] was null", message);
+            assertEquals("Message [" + i +"] has unexpected content", i, message.getIntProperty(INDEX));
+        }
+
+        consumer.close();
+
+        session.rollback();
+
+        MessageConsumer consumer2 = session.createConsumer(destination);
+
+        for(int i = 0; i < messageNumber ; i++)
+        {
+            Message message = consumer2.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message [" + i +"] was null", message);
+            assertEquals("Message [" + i +"] has unexpected content", i, message.getIntProperty(INDEX));
+        }
+
+        session.commit();
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message