qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1170311 - in /qpid/branches/qpid-3346/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/MessageGroupManager.cpp qpid/broker/MessageGroupManager.h tests/cluster_tests.py tests/msg_group_test.cpp tests/run_msg_group_tests
Date Tue, 13 Sep 2011 19:27:46 GMT
Author: kgiusti
Date: Tue Sep 13 19:27:45 2011
New Revision: 1170311

URL: http://svn.apache.org/viewvc?rev=1170311&view=rev
Log:
QPID-3346: add cluster support and unit test

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py
    qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1170311&r1=1170310&r2=1170311&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp Tue Sep 13 19:27:45 2011
@@ -702,6 +702,7 @@ Manageable::status_t Broker::queryQueue(
         QPID_LOG(error, "Query failed: queue not found, name=" << name);
         return Manageable::STATUS_UNKNOWN_OBJECT;
     }
+    q->query( results );
     return Manageable::STATUS_OK;;
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1170311&r1=1170310&r2=1170311&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Tue Sep 13 19:27:45
2011
@@ -305,3 +305,108 @@ boost::shared_ptr<MessageGroupManager> M
     }
     return empty;
 }
+
+/** Cluster replication:
+
+   state map format:
+
+   { "group-state": [ {"name": <group-name>,
+                       "owner": <consumer-name>-or-empty,
+                       "acquired-ct": <acquired count>,
+                       "positions": [Seqnumbers, ... ]},
+                      {...}
+                    ]
+   }
+*/
+
+namespace {
+    const std::string GROUP_NAME("name");
+    const std::string GROUP_OWNER("owner");
+    const std::string GROUP_ACQUIRED_CT("acquired-ct");
+    const std::string GROUP_POSITIONS("positions");
+    const std::string GROUP_STATE("group-state");
+}
+
+
+/** Runs on UPDATER to snapshot current state */
+void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
+{
+    using namespace qpid::framing;
+    state.clear();
+    framing::Array groupState(TYPE_CODE_MAP);
+    for (GroupMap::const_iterator g = messageGroups.begin();
+         g != messageGroups.end(); ++g) {
+
+        framing::FieldTable group;
+        group.setString(GROUP_NAME, g->first);
+        group.setString(GROUP_OWNER, g->second.owner);
+        group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+        framing::Array positions(TYPE_CODE_UINT32);
+        for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
+             p != g->second.members.end(); ++p)
+            positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+        group.setArray(GROUP_POSITIONS, positions);
+        groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
+    }
+    state.setArray(GROUP_STATE, groupState);
+
+    QPID_LOG(debug, "Queue \"" << queue->getName() << "\": replicating message
group state, key=" << groupIdHeader);
+}
+
+
+/** called on UPDATEE to set state from snapshot */
+void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
+{
+    using namespace qpid::framing;
+    messageGroups.clear();
+    consumers.clear();
+    freeGroups.clear();
+
+    framing::Array groupState(TYPE_CODE_MAP);
+
+    bool ok = state.getArray(GROUP_STATE, groupState);
+    if (!ok) {
+        QPID_LOG(error, "Unable to find message group state information for queue \"" <<
+                 queue->getName() << "\": cluster inconsistency error!");
+        return;
+    }
+
+    for (framing::Array::const_iterator g = groupState.begin();
+         g != groupState.end(); ++g) {
+        framing::FieldTable group;
+        ok = framing::getEncodedValue<FieldTable>(*g, group);
+        if (!ok) {
+            QPID_LOG(error, "Invalid message group state information for queue \"" <<
+                     queue->getName() << "\": table encoding error!");
+            return;
+        }
+        MessageGroupManager::GroupState state;
+        if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT))
{
+            QPID_LOG(error, "Invalid message group state information for queue \"" <<
+                     queue->getName() << "\": fields missing error!");
+            return;
+        }
+        state.group = group.getAsString(GROUP_NAME);
+        state.owner = group.getAsString(GROUP_OWNER);
+        state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+        framing::Array positions(TYPE_CODE_UINT32);
+        ok = group.getArray(GROUP_POSITIONS, positions);
+        if (!ok) {
+            QPID_LOG(error, "Invalid message group state information for queue \"" <<
+                     queue->getName() << "\": position encoding error!");
+            return;
+        }
+
+        for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
+            state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
+        messageGroups[state.group] = state;
+        if (state.owned())
+            consumers[state.owner]++;
+        else {
+            assert(state.members.size());
+            freeGroups[state.members.front()] = &messageGroups[state.group];
+        }
+    }
+
+    QPID_LOG(debug, "Queue \"" << queue->getName() << "\": message group state
replicated, key =" << groupIdHeader)
+}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1170311&r1=1170310&r2=1170311&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Tue Sep 13 19:27:45
2011
@@ -24,7 +24,7 @@
 
 /* for managing message grouping on Queues */
 
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/MessageAllocator.h"
 
 
@@ -34,7 +34,7 @@ namespace broker {
 class QueueObserver;
 class MessageAllocator;
 
-class MessageGroupManager : public QueueObserver, public MessageAllocator
+class MessageGroupManager : public StatefulQueueObserver, public MessageAllocator
 {
     const std::string groupIdHeader;    // msg header holding group identifier
     const unsigned int timestamp;       // mark messages with timestamp if set
@@ -55,8 +55,9 @@ class MessageGroupManager : public Queue
     typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
     typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
 
+    // note: update getState()/setState() when changing this object's state implementation
     GroupMap messageGroups; // index: group name
-    GroupFifo freeGroups; // ordered by oldest free msg
+    GroupFifo freeGroups;   // ordered by oldest free msg
     Consumers consumers;    // index: consumer name
 
     static const std::string qpidMessageGroupKey;
@@ -95,13 +96,17 @@ class MessageGroupManager : public Queue
     static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable&
settings );
 
     MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0
)
-        : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp)
{}
+      : StatefulQueueObserver(std::string("MessageGroupManager:") + header), MessageAllocator(q),
+        groupIdHeader( header ), timestamp(_timestamp) {}
     void enqueued( const QueuedMessage& qm );
     void acquired( const QueuedMessage& qm );
     void requeued( const QueuedMessage& qm );
     void dequeued( const QueuedMessage& qm );
     void consumerAdded( const Consumer& );
     void consumerRemoved( const Consumer& );
+    void getState(qpid::framing::FieldTable& state ) const;
+    void setState(const qpid::framing::FieldTable&);
+
     bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
                                 const sys::Mutex::ScopedLock&);
     // uses default nextBrowsableMessage()

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py?rev=1170311&r1=1170310&r2=1170311&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py Tue Sep 13 19:27:45 2011
@@ -1035,6 +1035,76 @@ class LongTests(BrokerTest):
             receiver.connection.detach()
             logger.setLevel(log_level)
 
+    def test_msg_group_failover(self):
+        """Test fail-over during continuous send-receive of grouped messages.
+        """
+
+        class GroupedTrafficGenerator(Thread):
+            def __init__(self, url, queue, group_key):
+                Thread.__init__(self)
+                self.url = url
+                self.queue = queue
+                self.group_key = group_key
+                self.status = -1
+
+            def run(self):
+                # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+                cmd = ["msg_group_test",
+                       "--broker=%s" % self.url,
+                       "--address=%s" % self.queue,
+                       "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+                       "--group-key=%s" % self.group_key,
+                       "--receivers=2",
+                       "--senders=3",
+                       "--messages=2011",
+                       "--send-rate=200",
+                       "--capacity=11",
+                       "--ack-frequency=23",
+                       "--allow-duplicates",
+                       "--group-size=37",
+                       "--randomize-group-size",
+                       "--interleave=13"]
+                #      "--trace"]
+                self.generator = Popen( cmd );
+                self.status = self.generator.wait()
+                return self.status
+
+            def results(self):
+                self.join(timeout=30)  # 3x assumed duration
+                if self.isAlive(): return -1
+                return self.status
+
+        # Original cluster will all be killed so expect exit with failure
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+        for b in cluster: b.ready()     # Wait for brokers to be ready
+
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.group_header_key':'group-id'}}}}")
+
+
+        # Kill original brokers, start new ones for the duration.
+        endtime = time.time() + self.duration();
+        i = 0
+        while time.time() < endtime:
+            traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+                                               "test-group-q", "group-id" )
+            traffic.start()
+            time.sleep(1)
+
+            for x in range(2):
+                for b in cluster[i:]: b.ready() # Check if any broker crashed.
+                cluster[i].kill()
+                i += 1
+                b = cluster.start(expect=EXPECT_EXIT_FAIL)
+                time.sleep(1)
+
+            # wait for traffic to finish, verify success
+            self.assertEqual(0, traffic.results())
+
+        for i in range(i, len(cluster)): cluster[i].kill()
+
+
 class StoreTests(BrokerTest):
     """
     Cluster tests that can only be run if there is a store available.

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp?rev=1170311&r1=1170310&r2=1170311&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/msg_group_test.cpp Tue Sep 13 19:27:45 2011
@@ -67,6 +67,9 @@ struct Options : public qpid::Options
     bool randomizeSize;
     bool stickyConsumer;
     uint timeout;
+    uint interleave;
+    std::string prefix;
+    uint sendRate;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -86,11 +89,13 @@ struct Options : public qpid::Options
           allowDuplicates(false),
           randomizeSize(false),
           stickyConsumer(false),
-          timeout(10)
+          timeout(10),
+          interleave(1),
+          sendRate(0)
     {
         addOptions()
           ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies
none of the messages will get accepted)")
-          ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+          ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive
from")
           ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated
messages")
           ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
           ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
@@ -98,10 +103,13 @@ struct Options : public qpid::Options
           ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
           ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates
distributed via amq.failover")
           ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing
the group identifier.")
+          ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of
all generated group identifiers.")
           ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
+          ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages
from N different groups.")
           ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each
sender.")
           ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
           ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of
messages per group to [1...group-size].")
+          ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second.
0 means send as fast as possible.")
           ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
           ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages
in a group are consumed by the same client [TBD].")
           ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers
remain idle for timeout seconds.")
@@ -181,12 +189,12 @@ public:
         // now verify
         SequenceMap::iterator s = sequenceMap.find(groupId);
         if (s == sequenceMap.end()) {
-            sequenceMap[groupId] = 1;
-            totalMsgsConsumed++;
             QPID_LOG(debug,  "Client " << client << " thinks this is the first
message from group " << groupId << ":" << sequence);
-            return sequence == 0;
-        }
-        if (sequence < s->second) {
+            // if duplication allowed, it is possible that the last msg(s) of an old sequence
are redelivered on reconnect.
+            // in this case, set the sequence from the first msg.
+            sequenceMap[groupId] = (allowDuplicates) ? sequence : 0;
+            s = sequenceMap.find(groupId);
+        } else if (sequence < s->second) {
             duplicateMsgs++;
             QPID_LOG(debug, "Client " << client << " thinks this message is a
duplicate! " << groupId << ":" << sequence);
             return allowDuplicates;
@@ -222,7 +230,9 @@ public:
     bool allMsgsConsumed()  // true when done processing msgs
     {
         qpid::sys::Mutex::ScopedLock l(lock);
-        return totalMsgsConsumed == totalMsgs;
+        return (totalMsgsPublished >= totalMsgs) &&
+          (totalMsgsConsumed >= totalMsgsPublished) &&
+          sequenceMap.size() == 0;
     }
 
     uint getConsumedTotal()
@@ -274,10 +284,85 @@ namespace {
         }
     };
 
-    static Randomizer randomize;
+    static Randomizer randomizer;
 }
 
 
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+
+    const std::string groupPrefix;
+    const uint groupSize;
+    const bool randomizeSize;
+    const uint interleave;
+
+    uint groupSuffix;
+    uint total;
+
+    struct GroupState {
+        std::string id;
+        const uint size;
+        uint count;
+        GroupState( const std::string& i, const uint s )
+            : id(i), size(s), count(0) {}
+    };
+    typedef std::list<GroupState> GroupList;
+    GroupList groups;
+    GroupList::iterator current;
+
+    // add a new group identifier to the list
+    void newGroup() {
+        std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+        groupId << std::string(":") << groupSuffix++;
+        uint size = (randomizeSize) ? randomizer(groupSize) : groupSize;
+        QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size="
<< size << " this=" << this);
+        GroupState group( groupId.str(), size );
+        groups.push_back( group );
+    }
+
+public:
+    GroupGenerator( const std::string& prefix,
+                    const uint t,
+                    const uint size,
+                    const bool randomize,
+                    const uint i)
+        : groupPrefix(prefix), groupSize(size),
+          randomizeSize(randomize), interleave(i), groupSuffix(0), total(t)
+    {
+        QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total="
<< total << " size=" << size << " rand=" << randomize <<
" interleave=" << interleave << " this=" << this);
+        for (uint i = 0; i < 1 || i < interleave; ++i) {
+            newGroup();
+        }
+        current = groups.begin();
+    }
+
+    bool genGroup(std::string& groupId, uint& seq, bool& eos)
+    {
+        if (!total) return false;
+        --total;
+        if (current == groups.end())
+            current = groups.begin();
+        groupId = current->id;
+        seq = current->count++;
+        if (current->count == current->size) {
+            QPID_LOG(trace, "Last msg for " << current->id << ", " <<
current->count << " this=" << this);
+            eos = true;
+            if (total >= interleave) {  // need a new group to replace this one
+                newGroup();
+                groups.erase(current++);
+            } else ++current;
+        } else {
+            ++current;
+            eos = total < interleave;   // mark eos on the last message of each group
+        }
+        QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq
<< " eos=" << eos << " this=" << this);
+        return true;
+    }
+};
+
+
+
 class Client : public qpid::sys::Runnable
 {
 public:
@@ -291,6 +376,7 @@ public:
     qpid::sys::Thread& getThread() { return thread; }
     const std::string getErrorMsg() { return error.str(); }
     void stop() {stopped = true;}
+    const std::string& getName() { return name; }
 
 protected:
     const std::string name;
@@ -323,16 +409,15 @@ public:
             Message msg;
             uint count = 0;
 
-            while (!stopped && !checker.allMsgsConsumed()) {
-
+            while (!stopped) {
                 if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
-
                     qpid::types::Variant::Map& properties = msg.getProperties();
-
                     std::string groupId = properties[opts.groupKey];
                     uint groupSeq = properties[SN];
                     bool eof = properties[EOS];
 
+                    QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq="
<< groupSeq << " eos=" << eof << " name=" << name);
+
                     qpid::sys::usleep(10);
 
                     if (!checker.checkSequence( groupId, groupSeq, name )) {
@@ -355,7 +440,8 @@ public:
                     }
                     // Clear out message properties & content for next iteration.
                     msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
-                }
+                } else if (checker.allMsgsConsumed())   // timed out, nothing else to do?
+                    break;
             }
             session.acknowledge();
             session.close();
@@ -367,6 +453,7 @@ public:
             connection.close();
         }
         clientDone();
+        QPID_LOG(trace, "Consuming client " << name << " completed.");
     }
 };
 
@@ -375,9 +462,13 @@ public:
 class Producer : public Client
 {
     GroupChecker& checker;
+    GroupGenerator generator;
 
 public:
-    Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n,
o), checker(c) {};
+    Producer(const std::string& n, const Options& o, GroupChecker& c)
+        : Client(n, o), checker(c),
+          generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave )
+    {};
     virtual ~Producer() {};
 
     void run()
@@ -392,32 +483,29 @@ public:
             if (opts.capacity) sender.setCapacity(opts.capacity);
             Message msg;
             msg.setDurable(opts.durable);
+            std::string groupId;
+            uint seq;
+            bool eos;
             uint sent = 0;
-            uint groupSeq = 0;
-            uint groupSize = opts.groupSize;
-            ostringstream group;
-            group << name << ":" << sent;
-            std::string groupId(group.str());
 
-            while (!stopped && sent < opts.messages) {
-                ++sent;
+            qpid::sys::AbsTime start = qpid::sys::now();
+            int64_t interval = 0;
+            if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
+
+            while (!stopped && generator.genGroup(groupId, seq, eos)) {
                 msg.getProperties()[opts.groupKey] = groupId;
-                msg.getProperties()[SN] = groupSeq++;
-                msg.getProperties()[EOS] = false;
-                checker.sendingSequence( groupId, groupSeq-1, (groupSeq == groupSize), name
);
-                if (groupSeq == groupSize) {
-                    msg.getProperties()[EOS] = true;
-                    // generate new group
-                    ostringstream nextGroupId;
-                    nextGroupId << name << ":" << sent;
-                    groupId = nextGroupId.str();
-                    groupSeq = 0;
-                    if (opts.randomizeSize) {
-                        groupSize = randomize(opts.groupSize);
-                    }
-                }
+                msg.getProperties()[SN] = seq;
+                msg.getProperties()[EOS] = eos;
+                checker.sendingSequence( groupId, seq, eos, name );
+
                 sender.send(msg);
-                qpid::sys::usleep(10);
+                ++sent;
+
+                if (opts.sendRate) {
+                    qpid::sys::AbsTime waitTill(start, sent*interval);
+                    int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+                    if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+                }
             }
             session.sync();
             session.close();
@@ -429,6 +517,7 @@ public:
             connection.close();
         }
         clientDone();
+        QPID_LOG(trace, "Producing client " << name << " completed.");
     }
 };
 
@@ -453,13 +542,13 @@ int main(int argc, char ** argv)
             // fire off the producers && consumers
             for (size_t j = 0; j < opts.senders; ++j)  {
                 ostringstream name;
-                name << "P_" << j;
+                name << opts.prefix << "P_" << j;
                 clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state
)));
                 clients.back()->getThread() = qpid::sys::Thread(*clients.back());
             }
             for (size_t j = 0; j < opts.receivers; ++j)  {
                 ostringstream name;
-                name << "C_" << j;
+                name << opts.prefix << "C_" << j;
                 clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state
)));
                 clients.back()->getThread() = qpid::sys::Thread(*clients.back());
             }
@@ -476,8 +565,9 @@ int main(int argc, char ** argv)
                 done = true;
                 for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
                      i != clients.end(); ++i) {
+                    QPID_LOG(debug, "Client " << (*i)->getName() << " state="
<< (*i)->getState());
                     if ((*i)->getState() == Client::FAILURE) {
-                        std::cerr << argv[0] << ": test failed with client error:
" << (*i)->getErrorMsg() << std::endl;
+                        QPID_LOG(error, argv[0] << ": test failed with client error:
" << (*i)->getErrorMsg());
                         clientFailed = true;
                         done = true;
                         break;  // exit test.
@@ -505,7 +595,7 @@ int main(int argc, char ** argv)
             if (clientFailed) {
                 status = 1;
             } else if (stalledTime >= opts.timeout) {
-                std::cerr << argv[0] << ": test failed due to stalled consumer."
<< std::endl;
+                QPID_LOG(error, argv[0] << ": test failed due to stalled consumer."
);
                 status = 2;
             }
 
@@ -517,10 +607,12 @@ int main(int argc, char ** argv)
             }
 
             if (opts.printReport && !status) state.print(std::cout);
-        }
+        } else status = 4;
     } catch(const std::exception& error) {
-        std::cerr << argv[0] << ": " << error.what() << std::endl;
+        QPID_LOG(error, argv[0] << ": " << error.what());
         status = 3;
     }
+    QPID_LOG(trace, "TEST DONE [" << status << "]");
+
     return status;
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests?rev=1170311&r1=1170310&r2=1170311&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests Tue Sep 13 19:27:45 2011
@@ -44,15 +44,15 @@ run_test() {
 declare -i i=0
 declare -a tests
 tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
-    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave
3"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
     "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
-    "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages
103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+    "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages
103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size
--interleave 5"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59  --group-size
5  --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
     "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59  --group-size
3  --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
-    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size
13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79"
+    "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size
13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53"
     "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
 
 while [ -n "${tests[i]}" ]; do



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message