qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1164284 - in /qpid/branches/qpid-3346/qpid/cpp/src/tests: Makefile.am msg_group_test.cpp run_msg_group_tests run_msg_group_tests_soak
Date Thu, 01 Sep 2011 21:34:50 GMT
Author: kgiusti
Date: Thu Sep  1 21:34:50 2011
New Revision: 1164284

URL: http://svn.apache.org/viewvc?rev=1164284&view=rev
Log:
QPID-3346: added some functional tests.

Added:
    qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak   (with props)
Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am
    qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am?rev=1164284&r1=1164283&r2=1164284&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/Makefile.am Thu Sep  1 21:34:50 2011
@@ -358,7 +358,10 @@ CLEANFILES+=valgrind.out *.log *.vglog* 
 # Longer running stability tests, not run by default check: target.
 # Not run under valgrind, too slow
 
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test
stop_broker \
+LONG_TESTS+=start_broker \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ run_msg_groups_tests_soak \
+ stop_broker \
  run_failover_soak reliable_replication_test \
  federated_cluster_test_with_node_failure
 
@@ -370,7 +373,8 @@ EXTRA_DIST+=						\
 	run_failover_soak				\
 	reliable_replication_test			\
 	federated_cluster_test_with_node_failure        \
-	sasl_test_setup.sh
+	sasl_test_setup.sh                              \
+	run_msg_groups_tests_soak
 
 check-long:
 	$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp?rev=1164284&r1=1164283&r2=1164284&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp Thu Sep  1 21:34:50 2011
@@ -29,6 +29,7 @@
 #include <qpid/Options.h>
 #include <qpid/log/Logger.h>
 #include <qpid/log/Options.h>
+#include "qpid/log/Statement.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
@@ -65,6 +66,7 @@ struct Options : public qpid::Options
     bool allowDuplicates;
     bool randomizeSize;
     bool stickyConsumer;
+    uint timeout;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -83,7 +85,8 @@ struct Options : public qpid::Options
           durable(false),
           allowDuplicates(false),
           randomizeSize(false),
-          stickyConsumer(false)
+          stickyConsumer(false),
+          timeout(10)
     {
         addOptions()
           ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies
none of the messages will get accepted)")
@@ -101,7 +104,8 @@ struct Options : public qpid::Options
           ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of
messages per group to [1...group-size].")
           ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
           ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages
in a group are consumed by the same client [TBD].")
-          ("print-report", qpid::optValue(printReport, "yes|no"), "Dump message group statistics
to stdout.")
+          ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers
remain idle for timeout seconds.")
+          ("print-report", qpid::optValue(printReport), "Dump message group statistics to
stdout.")
           ("help", qpid::optValue(help), "print this usage statement");
         add(log);
         //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if
a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
@@ -140,8 +144,9 @@ class GroupChecker
 {
     qpid::sys::Mutex lock;
 
-    const uint totalMsgsPublished;
+    const uint totalMsgs;
     uint totalMsgsConsumed;
+    uint totalMsgsPublished;
     bool allowDuplicates;
     uint duplicateMsgs;
 
@@ -157,7 +162,7 @@ class GroupChecker
 public:
 
     GroupChecker( uint t, bool d ) :
-        totalMsgsPublished(t), totalMsgsConsumed(0), allowDuplicates(d),
+        totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
         duplicateMsgs(0) {}
 
     bool checkSequence( const std::string& groupId,
@@ -165,6 +170,8 @@ public:
     {
         qpid::sys::Mutex::ScopedLock l(lock);
 
+        QPID_LOG(debug, "Client " << client << " has received " << groupId
<< ":" << sequence);
+
         GroupStatistics::iterator gs = statistics.find(groupId);
         if (gs == statistics.end()) {
             statistics[groupId][client] = 1;
@@ -176,19 +183,33 @@ public:
         if (s == sequenceMap.end()) {
             sequenceMap[groupId] = 1;
             totalMsgsConsumed++;
+            QPID_LOG(debug,  "Client " << client << " thinks this is the first
message from group " << groupId << ":" << sequence);
             return sequence == 0;
         }
         if (sequence < s->second) {
             duplicateMsgs++;
+            QPID_LOG(debug, "Client " << client << " thinks this message is a
duplicate! " << groupId << ":" << sequence);
             return allowDuplicates;
         }
         totalMsgsConsumed++;
         return sequence == s->second++;
     }
 
-    bool eraseGroup( const std::string& groupId )
+    void sendingSequence( const std::string& groupId,
+                          uint sequence, bool eos,
+                          const std::string& client )
     {
         qpid::sys::Mutex::ScopedLock l(lock);
+        ++totalMsgsPublished;
+
+        QPID_LOG(debug, "Client " << client << " sending " << groupId <<
":" << sequence <<
+                 ((eos) ? " (last)" : ""));
+    }
+
+    bool eraseGroup( const std::string& groupId, const std::string& name )
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        QPID_LOG(debug, "Deleting group " << groupId << " (by client " <<
name << ")");
         return sequenceMap.erase( groupId ) == 1;
     }
 
@@ -201,7 +222,7 @@ public:
     bool allMsgsConsumed()  // true when done processing msgs
     {
         qpid::sys::Mutex::ScopedLock l(lock);
-        return totalMsgsConsumed == totalMsgsPublished;
+        return totalMsgsConsumed == totalMsgs;
     }
 
     uint getConsumedTotal()
@@ -210,6 +231,12 @@ public:
         return totalMsgsConsumed;
     }
 
+    uint getPublishedTotal()
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        return totalMsgsPublished;
+    }
+
     ostream& print(ostream& out)
     {
         qpid::sys::Mutex::ScopedLock l(lock);
@@ -314,7 +341,7 @@ public:
                         testFailed( msg.str() );
                         break;
                     } else if (eof) {
-                        if (!checker.eraseGroup( groupId )) {
+                        if (!checker.eraseGroup( groupId, name )) {
                             ostringstream msg;
                             msg << "Erase group failed.  Group=" << groupId <<
" rcvd seq=" << groupSeq;
                             testFailed( msg.str() );
@@ -347,8 +374,10 @@ public:
 
 class Producer : public Client
 {
+    GroupChecker& checker;
+
 public:
-    Producer(const std::string& n, const Options& o) : Client(n, o) {};
+    Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n,
o), checker(c) {};
     virtual ~Producer() {};
 
     void run()
@@ -367,7 +396,7 @@ public:
             uint groupSeq = 0;
             uint groupSize = opts.groupSize;
             ostringstream group;
-            group << name << sent;
+            group << name << ":" << sent;
             std::string groupId(group.str());
 
             while (!stopped && sent < opts.messages) {
@@ -375,11 +404,12 @@ public:
                 msg.getProperties()[opts.groupKey] = groupId;
                 msg.getProperties()[SN] = groupSeq++;
                 msg.getProperties()[EOS] = false;
+                checker.sendingSequence( groupId, groupSeq-1, (groupSeq == groupSize), name
);
                 if (groupSeq == groupSize) {
                     msg.getProperties()[EOS] = true;
                     // generate new group
                     ostringstream nextGroupId;
-                    nextGroupId << name << sent;
+                    nextGroupId << name << ":" << sent;
                     groupId = nextGroupId.str();
                     groupSeq = 0;
                     if (opts.randomizeSize) {
@@ -424,7 +454,7 @@ int main(int argc, char ** argv)
             for (size_t j = 0; j < opts.senders; ++j)  {
                 ostringstream name;
                 name << "P_" << j;
-                clients.push_back(Client::shared_ptr(new Producer( name.str(), opts )));
+                clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state
)));
                 clients.back()->getThread() = qpid::sys::Thread(*clients.back());
             }
             for (size_t j = 0; j < opts.receivers; ++j)  {
@@ -435,11 +465,11 @@ int main(int argc, char ** argv)
             }
 
             // wait for all pubs/subs to finish.... or for consumers to fail or stall.
-            uint lastCount;
+            uint stalledTime = 0;
             bool done;
             bool clientFailed = false;
             do {
-                lastCount = state.getConsumedTotal();
+                uint lastCount = state.getConsumedTotal();
                 qpid::sys::usleep( 1000000 );
 
                 // check each client for status
@@ -450,16 +480,31 @@ int main(int argc, char ** argv)
                         std::cerr << argv[0] << ": test failed with client error:
" << (*i)->getErrorMsg() << std::endl;
                         clientFailed = true;
                         done = true;
-                        break;
+                        break;  // exit test.
                     } else if ((*i)->getState() != Client::DONE) {
                         done = false;
                     }
                 }
-            } while (!done && lastCount != state.getConsumedTotal());
+
+                if (!done) {
+                    // check that consumers are still receiving messages
+                    if (lastCount == state.getConsumedTotal())
+                        stalledTime++;
+                    else {
+                        lastCount = state.getConsumedTotal();
+                        stalledTime = 0;
+                    }
+                }
+
+                QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
+                         " Published to date = " << state.getPublishedTotal() <<
+                         " total=" << opts.senders * opts.messages );
+
+            } while (!done && stalledTime < opts.timeout);
 
             if (clientFailed) {
                 status = 1;
-            } else if (!state.allMsgsConsumed()) {
+            } else if (stalledTime >= opts.timeout) {
                 std::cerr << argv[0] << ": test failed due to stalled consumer."
<< std::endl;
                 status = 2;
             }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests?rev=1164284&r1=1164283&r2=1164284&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests Thu Sep  1 21:34:50 2011
@@ -18,9 +18,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-#script to run a sequence of ring queue tests via make
+#script to run a sequence of message group queue tests via make
 
-#setup path to find qpid-config and sender/receiver test progs
+#setup path to find qpid-config and msg_group_test progs
 source ./test_env.sh
 
 export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
@@ -28,33 +28,39 @@ export PATH=$PWD:$srcdir:$PYTHON_COMMAND
 #set port to connect to via env var
 test -s qpidd.port && QPID_PORT=`cat qpidd.port`
 
-trap cleanup INT TERM QUIT
+#trap cleanup INT TERM QUIT
 
 QUEUE_NAME="group-queue"
 GROUP_KEY="My-Group-Id"
 
-BROKER_URL="-a ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
 
-setup() {
-    qpid-config $BROKER_URL add queue $QUEUE_NAME --argument="qpid.group_header_key=${GROUP_KEY}"
+run_test() {
+    $@
 }
 
-cleanup() {
-    qpid-config $BROKER_URL del queue $QUEUE_NAME --force
-}
+##set -x
 
-run_test() {
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
+    "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages
103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59  --group-size
5  --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
+    "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59  --group-size
3  --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79"
+    "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
 
-    msg_group_test -a $QUEUE_NAME --group-key $GROUP_KEY --capacity 3 --group-size 13 --ack-frequency
7 --messages 103 --receivers 3 --senders 5
+while [ -n "${tests[i]}" ]; do
+    run_test ${tests[i]}
     RETCODE=$?
-    cleanup
-
     if test x$RETCODE != x0; then
-        echo "FAIL message group tests"; exit 1;
+        echo "FAILED message group test.  Failed command: \"${tests[i]}\"";
+        exit 1;
     fi
-}
-
-setup
-run_test
-
-
+    i+=1
+done

Added: qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak?rev=1164284&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak Thu Sep  1 21:34:50
2011
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of long-running message group tests via make
+
+#setup path to find qpid-config and msg_group_test test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+#trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+run_test() {
+    $@
+}
+
+##set -x
+
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007
--receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency
97"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007
--receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency
79"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007
--receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency
47"
+    "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
+
+while [ -n "${tests[i]}" ]; do
+    run_test ${tests[i]}
+    RETCODE=$?
+    if test x$RETCODE != x0; then
+        echo "FAILED message group test.  Failed command: \"${tests[i]}\"";
+        exit 1;
+    fi
+    i+=1
+done

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message