qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1073085 - in /qpid/trunk/qpid/cpp/src: qpid/broker/DeliveryRecord.cpp qpid/broker/SemanticState.cpp tests/MessagingSessionTests.cpp
Date Mon, 21 Feb 2011 17:30:17 GMT
Author: gsim
Date: Mon Feb 21 17:30:17 2011
New Revision: 1073085

URL: http://svn.apache.org/viewvc?rev=1073085&view=rev
Log:
QPID-3051: Ensure credit window is moved correctly even if it contains rejected messages.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1073085&r1=1073084&r2=1073085&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Feb 21 17:30:17 2011
@@ -131,18 +131,20 @@ void DeliveryRecord::committed() const{
 
 void DeliveryRecord::reject() 
 {    
-    Exchange::shared_ptr alternate = queue->getAlternateExchange();
-    if (alternate) {
-        DeliverableMessage delivery(msg.payload);
-        alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
-        QPID_LOG(info, "Routed rejected message from " << queue->getName() <<
" to " 
-                 << alternate->getName());
-    } else {
-        //just drop it
-        QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+    if (acquired && !ended) {
+        Exchange::shared_ptr alternate = queue->getAlternateExchange();
+        if (alternate) {
+            DeliverableMessage delivery(msg.payload);
+            alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+            QPID_LOG(info, "Routed rejected message from " << queue->getName() <<
" to "
+                     << alternate->getName());
+        } else {
+            //just drop it
+            QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+        }
+        dequeue();
+        setEnded();
     }
-
-    dequeue();
 }
 
 uint32_t DeliveryRecord::getCredit() const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1073085&r1=1073084&r2=1073085&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb 21 17:30:17 2011
@@ -697,8 +697,11 @@ void SemanticState::reject(DeliveryId fi
 {
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
-    //need to remove the delivery records as well
-    unacked.erase(range.start, range.end);
+    //may need to remove the delivery records as well
+    for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId()
<= last; ) {
+        if (i->isRedundant()) i = unacked.erase(i);
+        else i++;
+    }
 }
 
 bool SemanticState::ConsumerImpl::doOutput()

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1073085&r1=1073084&r2=1073085&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Feb 21 17:30:17 2011
@@ -937,6 +937,47 @@ QPID_AUTO_TEST_CASE(testQmfCreateAndDele
     }
 }
 
+QPID_AUTO_TEST_CASE(testRejectAndCredit)
+{
+    //Ensure credit is restored on completing rejected messages
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+
+    const uint count(10);
+    receiver.setCapacity(count);
+    for (uint i = 0; i < count; i++) {
+        sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+    }
+
+    Message in;
+    for (uint i = 0; i < count; ++i) {
+        if (receiver.fetch(in, Duration::SECOND)) {
+            BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+            fix.session.reject(in);
+        } else {
+            BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+1)).str());
+            break;
+        }
+    }
+    //send another batch of messages
+    for (uint i = 0; i < count; i++) {
+        sender.send(Message((boost::format("Message_%1%") % (i+count)).str()));
+    }
+
+    for (uint i = 0; i < count; ++i) {
+        if (receiver.fetch(in, Duration::SECOND)) {
+            BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+count)).str());
+        } else {
+            BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+count)).str());
+            break;
+        }
+    }
+    fix.session.acknowledge();
+    receiver.close();
+    sender.close();
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message