qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r599128 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/Dispatcher.cpp qpid/client/SessionCore.cpp tests/BrokerChannelTest.cpp tests/perftest.cpp
Date Wed, 28 Nov 2007 20:17:56 GMT
Author: aconway
Date: Wed Nov 28 12:17:55 2007
New Revision: 599128

URL: http://svn.apache.org/viewvc?rev=599128&view=rev
Log:

Add unit_test.h to distribution.
Updated/removed sundry FIXME comments.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=599128&r1=599127&r2=599128&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Wed Nov 28 12:17:55 2007
@@ -75,20 +75,11 @@
         if (content->isA<MessageTransferBody>()) {
             Message msg(*content, session);
             Subscriber::shared_ptr listener = find(msg.getDestination());
-            if (!listener) {
-                // FIXME aconway 2007-11-07: Should close session & throw here?
-                QPID_LOG(error, "No message listener for "
-                         << content->getMethod());
-            } else {
-                listener->received(msg);
-            }
+            assert(listener);
+            listener->received(msg);
         } else {
-            if (handler.get()) {
-                handler->handle(*content);
-            } else {
-                // FIXME aconway 2007-11-07: Should close session & throw here?
-                QPID_LOG(error, "Unhandled method: " << content->getMethod()); 
                                      
-            }
+            assert (handler.get());
+            handler->handle(*content);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=599128&r1=599127&r2=599128&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Nov 28 12:17:55 2007
@@ -211,7 +211,7 @@
         proxy.resume(getId());
         waitFor(OPEN);
         proxy.ack(sendAck, SequenceNumberSet());
-        // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem
+        // TODO aconway 2007-10-23: Replay inside the lock might be a prolem
         // for large replay sets.
         SessionState::Replay replay=session->replay();
         for (SessionState::Replay::iterator i = replay.begin();
@@ -244,9 +244,10 @@
     check(state == OPENING || state == RESUMING,
           COMMAND_INVALID, UNEXPECTED_SESSION_ATTACHED);
     if (state==OPENING) {        // New session
-        // FIXME aconway 2007-10-17: arbitrary ack value of 100 for
-        // client, allow configuration.
-        session=in_place<SessionState>(100, detachedLifetime > 0, sessionId);
+        // TODO aconway 2007-10-17: 0 disables sesskon.ack for now.
+        // If AMQP WG decides to keep it, we need to add configuration
+        // for the ack rate.
+        session=in_place<SessionState>(0, detachedLifetime > 0, sessionId);
         setState(OPEN);
     }
     else {                      // RESUMING

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=599128&r1=599127&r2=599128&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Wed Nov 28 12:17:55 2007
@@ -19,10 +19,6 @@
  *
  */
 
-// FIXME aconway 2007-08-30: Rewrite as a Session test.
-// There is an issue with the tests use of DeliveryAdapter
-// which is no longer exposed on Session (part of SemanticHandler.)
-// 
 #include "qpid/broker/BrokerChannel.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/FanOutExchange.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=599128&r1=599127&r2=599128&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov 28 12:17:55 2007
@@ -334,6 +334,7 @@
     }
 };
 
+
 struct PublishThread : public Client {
     string destination;
     string routingKey;
@@ -419,18 +420,31 @@
 
             Message msg;
             AbsTime start=now();
+            size_t lastMsg=0;
             for (size_t i = 0; i < opts.subQuota; ++i) {
                 msg=lq.pop();
-                // FIXME aconway 2007-11-23: Verify message sequence numbers.
-                // Need an array of counters, one per publisher and need
-                // publisher ID in the message for multiple publishers.
+                // TODO aconway 2007-11-23: check message sequence for
+                // multiple publishers. Need an array of counters,
+                // one per publisher and a publisher ID in the
+                // message. Careful not to introduce a lot of overhead
+                // here, e.g. no std::map, std::string etc.
+                //
+                // For now verify order only for a single publisher.
+                if (opts.pubs == 1) {
+                    char* data = const_cast<char*>(msg.getData().data());
+                    size_t n = *reinterpret_cast<uint32_t*>(data);
+                    if (n < lastMsg) {
+                        // Report to control.
+                        Message error("Out-of-sequence messages", "sub_done");
+                        session.messageTransfer(arg::content=error);
+                        return;
+                    }
+                    lastMsg=n;
+                }
             }
             if (opts.ack !=0)
                 msg.acknowledge(); // Cumulative ack for final batch.
             AbsTime end=now();
-
-            // FIXME aconway 2007-11-23: close the subscription,
-            // release any pending messages.
 
             // Report to publisher.
             Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),



Mime
View raw message