qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1233680 - /qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp
Date Thu, 19 Jan 2012 23:08:19 GMT
Author: aconway
Date: Thu Jan 19 23:08:18 2012
New Revision: 1233680

URL: http://svn.apache.org/viewvc?rev=1233680&view=rev
Log:
QPID-3603: Check for gaps in sequence numbers in qpid-receive.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp?rev=1233680&r1=1233679&r2=1233680&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/qpid-receive.cpp Thu Jan 19 23:08:18 2012
@@ -53,6 +53,7 @@ struct Options : public qpid::Options
     bool forever;
     uint messages;
     bool ignoreDuplicates;
+    bool verifySequence;
     bool checkRedelivered;
     uint capacity;
     uint ackFrequency;
@@ -76,6 +77,7 @@ struct Options : public qpid::Options
           forever(false),
           messages(0),
           ignoreDuplicates(false),
+          verifySequence(false),
           checkRedelivered(false),
           capacity(1000),
           ackFrequency(100),
@@ -98,6 +100,7 @@ struct Options : public qpid::Options
             ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
             ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive;
0 means receive indefinitely")
             ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates
(by checking 'sn' header)")
+            ("verify-sequence", qpid::optValue(verifySequence), "Verify there are no gaps
in the message sequence (by checking 'sn' header)")
             ("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception
if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
             ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
             ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies
none of the messages will get accepted)")
@@ -145,22 +148,29 @@ struct Options : public qpid::Options
 const string EOS("eos");
 const string SN("sn");
 
+/** Check for duplicate or dropped messages by sequence number */
 class SequenceTracker
 {
-    uint lastSn;
   public:
-    SequenceTracker() : lastSn(0) {}
+    SequenceTracker(const Options& o) : opts(o), lastSn(0) {}
 
-    bool isDuplicate(Message& message)
-    {
+    /** Return true if the message should be procesed, false if it should be ignored. */
+    bool track(Message& message) {
         uint sn = message.getProperties()[SN];
-        if (lastSn < sn) {
-            lastSn = sn;
-            return false;
-        } else {
-            return true;
-        }
+        bool duplicate = (sn <= lastSn);
+        bool dropped = (sn > lastSn+1);
+        if (opts.verifySequence && dropped)
+            throw Exception(QPID_MSG("Gap in sequence numbers " << lastSn <<
"-" << sn));
+        bool ignore = duplicate && opts.ignoreDuplicates;
+        if (ignore && opts.checkRedelivered && !message.getRedelivered())
+            throw qpid::Exception("duplicate sequence number received, message not marked
as redelivered!");
+        if (!duplicate) lastSn = sn;
+        return !ignore;
     }
+
+  private:
+    const Options& opts;
+    uint lastSn;
 };
 
 }} // namespace qpid::tests
@@ -182,13 +192,12 @@ int main(int argc, char ** argv)
             Message msg;
             uint count = 0;
             uint txCount = 0;
-            SequenceTracker sequenceTracker;
+            SequenceTracker sequenceTracker(opts);
             Duration timeout = opts.getTimeout();
             bool done = false;
             Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
             if (!opts.readyAddress.empty())
                 session.createSender(opts.readyAddress).send(msg);
-
             // For receive rate calculation
             qpid::sys::AbsTime start = qpid::sys::now();
             int64_t interval = 0;
@@ -198,7 +207,7 @@ int main(int argc, char ** argv)
 
             while (!done && receiver.fetch(msg, timeout)) {
                 reporter.message(msg);
-                if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
+                if (sequenceTracker.track(msg)) {
                     if (msg.getContent() == EOS) {
                         done = true;
                     } else {
@@ -219,8 +228,6 @@ int main(int argc, char ** argv)
                             std::cout << msg.getContent() << std::endl;//TODO:
handle map or list messages
                         if (opts.messages && count >= opts.messages) done = true;
                     }
-                } else if (opts.checkRedelivered && !msg.getRedelivered()) {
-                    throw qpid::Exception("duplicate sequence number received, message not
marked as redelivered!");
                 }
                 if (opts.tx && (count % opts.tx == 0)) {
                     if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency
== 0)) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message