qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r1229857 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/AMQSession.java client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
Date Wed, 11 Jan 2012 02:12:38 GMT
Author: rajith
Date: Wed Jan 11 02:12:38 2012
New Revision: 1229857

URL: http://svn.apache.org/viewvc?rev=1229857&view=rev
Log:
QPID-3604 The code now drains individual consumer queues as well as the
dispatch queue (via syncDipatchQueue method) and releases both unacked
and prefetched messages, while only the former being marked redelivered.
Also all of these transfers are being marked as completed to ensure
credits don't dry up.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1229857&r1=1229856&r2=1229857&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan
11 02:12:38 2012
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
      * Set when the dispatcher should direct incoming messages straight into the UnackedMessage
list instead of
      * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
      */
-    private volatile boolean _usingDispatcherForCleanup;
+    protected volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped.
*/
     private boolean _connectionStopped;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1229857&r1=1229856&r2=1229857&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Jan 11 02:12:38 2012
@@ -795,11 +795,43 @@ public class AMQSession_0_10 extends AMQ
     {
         if (suspend)
         {
-            for (BasicMessageConsumer consumer : _consumers.values())
-            {
-                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
-                                             Option.UNRELIABLE);
-            }
+                synchronized (getMessageDeliveryLock())
+                {
+                    for (BasicMessageConsumer consumer : _consumers.values())
+	            {
+	                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+	                                             Option.UNRELIABLE);
+	                sync();
+	                List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+	                _prefetchedMessageTags.addAll(tags);
+	            }
+                }
+
+                _usingDispatcherForCleanup = true;
+                syncDispatchQueue();
+                _usingDispatcherForCleanup = false;
+
+                RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+		RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+		RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+					+ prefetched.size());
+
+		for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+		{
+			Range range = deliveredIter.next();
+			all.add(range);
+		}
+
+		for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+		{
+			Range range = prefetchedIter.next();
+			all.add(range);
+		}
+
+		flushProcessed(all, false);
+		getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+		getQpidSession().messageRelease(prefetched);
+		sync();
         }
         else
         {

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1229857&r1=1229856&r2=1229857&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
Wed Jan 11 02:12:38 2012
@@ -5,12 +5,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -133,4 +135,41 @@ public class PrefetchBehaviourTest exten
         assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
     }
 
+    /**
+     * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
+     * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of
5, but never consume them.
+     *                Stop the connection. Create a new connection and a consumer with maxprefetch
10 on the same queue.
+     *                Try to receive all 10 messages.
+     */
+    public void testConnectionStop() throws Exception
+    {
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
+        Connection con = getConnection();
+        con.start();
+        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
+
+        MessageProducer prod = ssn.createProducer(queue);
+        for (int i=0; i<10;i++)
+        {
+           prod.send(ssn.createTextMessage("Msg" + i));
+        }
+
+        MessageConsumer consumer = ssn.createConsumer(queue);
+        // This is to ensure we get the first client to prefetch.
+        Message msg = consumer.receive(1000);
+        assertNotNull("The first consumer should get one message",msg);
+        con.stop();
+
+        Connection con2 = getConnection();
+        con2.start();
+        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = ssn2.createConsumer(queue);
+        for (int i=0; i<9;i++)
+        {
+           TextMessage m = (TextMessage)consumer2.receive(1000);
+           assertNotNull("The second consumer should get 9 messages, but received only "
+ i,m);
+        }
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message