qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject qpid-cpp git commit: QPID-7406: reset cursors for cosnumers if message is released
Date Mon, 29 Aug 2016 12:27:45 GMT
Repository: qpid-cpp
Updated Branches:
  refs/heads/master 09324a7b1 -> 3b9b412e6


QPID-7406: reset cursors for cosnumers if message is released


Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/3b9b412e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/3b9b412e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/3b9b412e

Branch: refs/heads/master
Commit: 3b9b412e6519d7e653a2b75dfdcbcb090af1d92d
Parents: 09324a7
Author: Gordon Sim <gsim@redhat.com>
Authored: Mon Aug 29 13:27:00 2016 +0100
Committer: Gordon Sim <gsim@redhat.com>
Committed: Mon Aug 29 13:27:11 2016 +0100

----------------------------------------------------------------------
 src/qpid/broker/MessageMap.cpp      |  8 ++++++-
 src/qpid/broker/MessageMap.h        |  1 +
 src/tests/MessagingSessionTests.cpp | 41 ++++++++++++++++++++++++++++++++
 3 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/3b9b412e/src/qpid/broker/MessageMap.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/MessageMap.cpp b/src/qpid/broker/MessageMap.cpp
index 4cdd83c..c996236 100644
--- a/src/qpid/broker/MessageMap.cpp
+++ b/src/qpid/broker/MessageMap.cpp
@@ -82,10 +82,15 @@ Message* MessageMap::find(const framing::SequenceNumber& position,
QueueCursor*
     }
 }
 
+bool MessageMap::reset(const QueueCursor& cursor)
+{
+    return !cursor.valid || (cursor.type == CONSUMER && cursor.version != version);
+}
+
 Message* MessageMap::next(QueueCursor& cursor)
 {
     Ordering::iterator i;
-    if (!cursor.valid) i = messages.begin(); //start with oldest message
+    if (reset(cursor)) i = messages.begin(); //start with oldest message
     else i = messages.upper_bound(cursor.position); //get first message that is greater than
position
 
     while (i != messages.end()) {
@@ -137,6 +142,7 @@ Message* MessageMap::release(const QueueCursor& cursor)
     Ordering::iterator i = messages.find(cursor.position);
     if (i != messages.end()) {
         i->second.setState(AVAILABLE);
+        version++;
         return &i->second;
     } else {
         return 0;

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/3b9b412e/src/qpid/broker/MessageMap.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/MessageMap.h b/src/qpid/broker/MessageMap.h
index c30606d..600ad62 100644
--- a/src/qpid/broker/MessageMap.h
+++ b/src/qpid/broker/MessageMap.h
@@ -65,6 +65,7 @@ class MessageMap : public Messages
     std::string getKey(const Message&);
     virtual const Message& replace(const Message&, const Message&);
     void erase(Ordering::iterator);
+    bool reset(const QueueCursor& cursor);
 };
 }} // namespace qpid::broker
 

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/3b9b412e/src/tests/MessagingSessionTests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/MessagingSessionTests.cpp b/src/tests/MessagingSessionTests.cpp
index 2a953d2..c6d8c39 100644
--- a/src/tests/MessagingSessionTests.cpp
+++ b/src/tests/MessagingSessionTests.cpp
@@ -1653,6 +1653,47 @@ QPID_AUTO_TEST_CASE(testPriorityRingEviction)
     BOOST_CHECK(!receiver.fetch(msg, Duration::IMMEDIATE));
 }
 
+QPID_AUTO_TEST_CASE(testReleaseResetsCursor)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    send(sender, 10);
+    Receiver r1 = fix.session.createReceiver(fix.queue);
+    Receiver r2 = fix.session.createReceiver(fix.queue);
+    Message m1;
+    BOOST_CHECK(r1.fetch(m1, Duration::IMMEDIATE));
+    BOOST_CHECK_EQUAL(m1.getContent(), "Message_1");
+    for (uint i = 1; i < 10; i++) {
+        Message msg;
+        BOOST_CHECK(r2.fetch(msg, Duration::IMMEDIATE));
+        BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+    }
+    fix.session.release(m1);
+    Message msg;
+    BOOST_CHECK(r2.fetch(msg, Duration::IMMEDIATE));
+    BOOST_CHECK_EQUAL(msg.getContent(), "Message_1");
+    fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testReleaseResetsCursorForLVQ)
+{
+    MessagingFixture fix;
+    std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.last_value_queue_key:qpid.subject}}}}");
+    Sender sender = fix.session.createSender(queue);
+    sender.send(Message("please release me"));
+    Receiver r1 = fix.session.createReceiver(queue);
+    Receiver r2 = fix.session.createReceiver(queue);
+    Message m1;
+    Message m2;
+    BOOST_CHECK(r1.fetch(m1, Duration::IMMEDIATE));
+    BOOST_CHECK_EQUAL(m1.getContent(), "please release me");
+    BOOST_CHECK(!r2.fetch(m2, Duration::IMMEDIATE));
+    fix.session.release(m1);
+    BOOST_CHECK(r2.fetch(m2, Duration::SECOND*5));
+    BOOST_CHECK_EQUAL(m2.getContent(), "please release me");
+    fix.session.acknowledge();
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message