qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1180050 [2/2] - in /qpid/trunk/qpid: ./ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ doc/book/src/ java/broker/src/main/java/org/apache/qpid/qmf/ specs/ tests/src/py/qpid_tests/broker_0_10/ tools/src/py/
Date Fri, 07 Oct 2011 14:21:50 GMT
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Oct  7 14:21:48 2011
@@ -56,12 +56,12 @@ class TestConsumer : public virtual Cons
 public:
     typedef boost::shared_ptr<TestConsumer> shared_ptr;
 
-    intrusive_ptr<Message> last;
+    QueuedMessage last;
     bool received;
-    TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+    TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false)
{};
 
     virtual bool deliver(QueuedMessage& msg){
-        last = msg.payload;
+        last = msg;
         received = true;
         return true;
     };
@@ -149,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
 
     queue->deliver(msg1);
     BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+    BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
 
     queue->deliver(msg2);
     BOOST_CHECK(queue->dispatch(c2));
-    BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+    BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
 
     c1->received = false;
     queue->deliver(msg3);
     BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+    BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
 
     //Test cancellation:
     queue->cancel(c1);
@@ -214,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
     if (!consumer->received)
         sleep(2);
 
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
     BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
 
     received = queue->get().payload;
@@ -298,14 +298,14 @@ QPID_AUTO_TEST_CASE(testSeek){
     queue->deliver(msg2);
     queue->deliver(msg3);
 
-    TestConsumer::shared_ptr consumer(new TestConsumer(false));
+    TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
     SequenceNumber seq(2);
     consumer->position = seq;
 
     QueuedMessage qm;
     queue->dispatch(consumer);
 
-    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
     queue->dispatch(consumer);
     queue->dispatch(consumer); // make sure over-run is safe
 
@@ -325,14 +325,18 @@ QPID_AUTO_TEST_CASE(testSearch){
     queue->deliver(msg3);
 
     SequenceNumber seq(2);
-    QueuedMessage qm = queue->find(seq);
+    QueuedMessage qm;
+    TestConsumer::shared_ptr c1(new TestConsumer());
+
+    BOOST_CHECK(queue->find(seq, qm));
 
     BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
 
-    queue->acquire(qm);
+    queue->acquire(qm, c1->getName());
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
     SequenceNumber seq1(3);
-    QueuedMessage qm1 = queue->find(seq1);
+    QueuedMessage qm1;
+    BOOST_CHECK(queue->find(seq1, qm1));
     BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
 
 }
@@ -552,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
     framing::SequenceNumber sequence1(10);
     QueuedMessage qmsg3(queue.get(), 0, sequence1);
+    TestConsumer::shared_ptr dummy(new TestConsumer());
 
-    BOOST_CHECK(!queue->acquire(qmsg));
-    BOOST_CHECK(queue->acquire(qmsg2));
+    BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
+    BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
     // Acquire the massage again to test failure case.
-    BOOST_CHECK(!queue->acquire(qmsg2));
-    BOOST_CHECK(!queue->acquire(qmsg3));
+    BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
+    BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
 
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
 
@@ -567,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     // set mode to no browse and check
     args.setOrdering(client::LVQ_NO_BROWSE);
     queue->configure(args);
-    TestConsumer::shared_ptr c1(new TestConsumer(false));
+    TestConsumer::shared_ptr c1(new TestConsumer("test", false));
 
     queue->dispatch(c1);
     queue->dispatch(c1);
@@ -696,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
 }
 
+
+namespace {
+    // helper for group tests
+    void verifyAcquire( Queue::shared_ptr queue,
+                        TestConsumer::shared_ptr c,
+                        std::deque<QueuedMessage>& results,
+                        const std::string& expectedGroup,
+                        const int expectedId )
+    {
+        queue->dispatch(c);
+        results.push_back(c->last);
+        std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+        int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+        BOOST_CHECK_EQUAL( group, expectedGroup );
+        BOOST_CHECK_EQUAL( id, expectedId );
+    }
+}
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+    //
+    // Verify that consumers of grouped messages own the groups once a message is acquired,
+    // and release the groups once all acquired messages have been dequeued or requeued
+    //
+    FieldTable args;
+    Queue::shared_ptr queue(new Queue("my_queue", true));
+    args.setString("qpid.group_header_key", "GROUP-ID");
+    args.setInt("qpid.shared_msg_group", 1);
+    queue->configure(args);
+
+    std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+                             std::string("b"), std::string("b"), std::string("b"),
+                             std::string("c"), std::string("c"), std::string("c") };
+    for (int i = 0; i < 9; ++i) {
+        intrusive_ptr<Message> msg = create_message("e", "A");
+        msg->insertCustomProperty("GROUP-ID", groups[i]);
+        msg->insertCustomProperty("MY-ID", i);
+        queue->deliver(msg);
+    }
+
+    // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+    BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
+
+    TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+    TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+    queue->consume(c1);
+    queue->consume(c2);
+
+    std::deque<QueuedMessage> dequeMeC1;
+    std::deque<QueuedMessage> dequeMeC2;
+
+
+    verifyAcquire(queue, c1, dequeMeC1, "a", 0 );  // c1 now owns group "a" (acquire a-0)
+    verifyAcquire(queue, c2, dequeMeC2, "b", 3 );  // c2 should now own group "b" (acquire
b-3)
+
+    // now let c1 complete the 'a-0' message - this should free the 'a' group
+    queue->dequeue( 0, dequeMeC1.front() );
+    dequeMeC1.pop_front();
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+    // now c2 should pick up the next 'a-1', since it is oldest free
+    verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+    // c1 should only be able to snarf up the first "c" message now...
+    verifyAcquire(queue, c1, dequeMeC1, "c", 6 );    // should skip to the first "c"
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+    // hmmm... what if c2 now dequeues "b-3"?  (now only has a-1 acquired)
+    queue->dequeue( 0, dequeMeC2.front() );
+    dequeMeC2.pop_front();
+
+    // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+    // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+    verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
+
+    // c2 can now only grab a-2, and that's all
+    verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
+
+    // now C2 can't get any more, since C1 owns "b" and "c" group...
+    bool gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    // hmmm... what if c1 now dequeues "c-6"?  (now only own's b-4)
+    queue->dequeue( 0, dequeMeC1.front() );
+    dequeMeC1.pop_front();
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+    // c2 can now grab c-7
+    verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+    // what happens if C-2 "requeues" a-1 and a-2?
+    queue->requeue( dequeMeC2.front() );
+    dequeMeC2.pop_front();
+    queue->requeue( dequeMeC2.front() );
+    dequeMeC2.pop_front();  // now just has c-7 acquired
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+    // now c1 will grab a-1 and a-2...
+    verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+    verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+    // c2 can now acquire c-8 only
+    verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
+
+    // and c1 can get b-5
+    verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+    // should be no more acquire-able for anyone now:
+    gotOne = queue->dispatch(c1);
+    BOOST_CHECK( !gotOne );
+    gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    // requeue all of C1's acquired messages, then cancel C1
+    while (!dequeMeC1.empty()) {
+        queue->requeue(dequeMeC1.front());
+        dequeMeC1.pop_front();
+    }
+    queue->cancel(c1);
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ---, ---, ---, ---, ^C2, ^C2
+
+    // b-4, a-1, a-2, b-5 all should be available, right?
+    verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
+
+    while (!dequeMeC2.empty()) {
+        queue->dequeue(0, dequeMeC2.front());
+        dequeMeC2.pop_front();
+    }
+
+    // Queue = a-2, b-4, b-5
+    // Owners= ---, ---, ---
+
+    TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+    std::deque<QueuedMessage> dequeMeC3;
+
+    verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+    verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+    // Queue = a-2, b-4, b-5
+    // Owners= ^C3, ^C2, ^C2
+
+    gotOne = queue->dispatch(c3);
+    BOOST_CHECK( !gotOne );
+
+    verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+    while (!dequeMeC2.empty()) {
+        queue->dequeue(0, dequeMeC2.front());
+        dequeMeC2.pop_front();
+    }
+
+    // Queue = a-2,
+    // Owners= ^C3,
+
+    intrusive_ptr<Message> msg = create_message("e", "A");
+    msg->insertCustomProperty("GROUP-ID", "a");
+    msg->insertCustomProperty("MY-ID", 9);
+    queue->deliver(msg);
+
+    // Queue = a-2, a-9
+    // Owners= ^C3, ^C3
+
+    gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    msg = create_message("e", "A");
+    msg->insertCustomProperty("GROUP-ID", "b");
+    msg->insertCustomProperty("MY-ID", 10);
+    queue->deliver(msg);
+
+    // Queue = a-2, a-9, b-10
+    // Owners= ^C3, ^C3, ----
+
+    verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+    verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+    gotOne = queue->dispatch(c3);
+    BOOST_CHECK( !gotOne );
+
+    queue->cancel(c2);
+    queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+    //
+    // Verify that the same default group name is automatically applied to messages that
+    // do not specify a group name.
+    //
+    FieldTable args;
+    Queue::shared_ptr queue(new Queue("my_queue", true));
+    args.setString("qpid.group_header_key", "GROUP-ID");
+    args.setInt("qpid.shared_msg_group", 1);
+    queue->configure(args);
+
+    for (int i = 0; i < 3; ++i) {
+        intrusive_ptr<Message> msg = create_message("e", "A");
+        // no "GROUP-ID" header
+        msg->insertCustomProperty("MY-ID", i);
+        queue->deliver(msg);
+    }
+
+    // Queue = 0, 1, 2
+
+    BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
+
+    TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+    TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+    queue->consume(c1);
+    queue->consume(c2);
+
+    std::deque<QueuedMessage> dequeMeC1;
+    std::deque<QueuedMessage> dequeMeC2;
+
+    queue->dispatch(c1);    // c1 now owns default group (acquired 0)
+    dequeMeC1.push_back(c1->last);
+    int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 0 );
+
+    bool gotOne = queue->dispatch(c2);  // c2 should get nothing
+    BOOST_CHECK( !gotOne );
+
+    queue->dispatch(c1);    // c1 now acquires 1
+    dequeMeC1.push_back(c1->last);
+    id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 1 );
+
+    gotOne = queue->dispatch(c2);  // c2 should still get nothing
+    BOOST_CHECK( !gotOne );
+
+    while (!dequeMeC1.empty()) {
+        queue->dequeue(0, dequeMeC1.front());
+        dequeMeC1.pop_front();
+    }
+
+    // now default group should be available...
+    queue->dispatch(c2);    // c2 now owns default group (acquired 2)
+    id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 2 );
+
+    gotOne = queue->dispatch(c1);  // c1 should get nothing
+    BOOST_CHECK( !gotOne );
+
+    queue->cancel(c1);
+    queue->cancel(c2);
+}
+
 QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
 
     TestMessageStoreOC  testStore;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Oct  7 14:21:48 2011
@@ -1418,6 +1418,76 @@ class LongTests(BrokerTest):
             if receiver: receiver.connection.detach()
             logger.setLevel(log_level)
 
+    def test_msg_group_failover(self):
+        """Test fail-over during continuous send-receive of grouped messages.
+        """
+
+        class GroupedTrafficGenerator(Thread):
+            def __init__(self, url, queue, group_key):
+                Thread.__init__(self)
+                self.url = url
+                self.queue = queue
+                self.group_key = group_key
+                self.status = -1
+
+            def run(self):
+                # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+                cmd = ["msg_group_test",
+                       "--broker=%s" % self.url,
+                       "--address=%s" % self.queue,
+                       "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+                       "--group-key=%s" % self.group_key,
+                       "--receivers=2",
+                       "--senders=3",
+                       "--messages=2011",
+                       "--send-rate=200",
+                       "--capacity=11",
+                       "--ack-frequency=23",
+                       "--allow-duplicates",
+                       "--group-size=37",
+                       "--randomize-group-size",
+                       "--interleave=13"]
+                #      "--trace"]
+                self.generator = Popen( cmd );
+                self.status = self.generator.wait()
+                return self.status
+
+            def results(self):
+                self.join(timeout=30)  # 3x assumed duration
+                if self.isAlive(): return -1
+                return self.status
+
+        # Original cluster will all be killed so expect exit with failure
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+        for b in cluster: b.ready()     # Wait for brokers to be ready
+
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+        s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}"
% q_args)
+
+        # Kill original brokers, start new ones for the duration.
+        endtime = time.time() + self.duration();
+        i = 0
+        while time.time() < endtime:
+            traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+                                               "test-group-q", "group-id" )
+            traffic.start()
+            time.sleep(1)
+
+            for x in range(2):
+                for b in cluster[i:]: b.ready() # Check if any broker crashed.
+                cluster[i].kill()
+                i += 1
+                b = cluster.start(expect=EXPECT_EXIT_FAIL)
+                time.sleep(1)
+
+            # wait for traffic to finish, verify success
+            self.assertEqual(0, traffic.results())
+
+        for i in range(i, len(cluster)): cluster[i].kill()
+
+
 class StoreTests(BrokerTest):
     """
     Cluster tests that can only be run if there is a store available.

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct  7 14:21:48 2011
@@ -1 +1,2 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py:1061302-1072333
+/qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py:1144319-1179855

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp Fri Oct  7 14:21:48 2011
@@ -28,6 +28,7 @@
 #include <qpid/messaging/FailoverUpdates.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/Monitor.h>
+#include <qpid/sys/SystemInfo.h>
 #include "TestOptions.h"
 #include "Statistics.h"
 
@@ -76,6 +77,11 @@ struct Options : public qpid::Options
     uint flowControl;
     bool sequence;
     bool timestamp;
+    std::string groupKey;
+    std::string groupPrefix;
+    uint groupSize;
+    bool groupRandSize;
+    uint groupInterleave;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -100,7 +106,11 @@ struct Options : public qpid::Options
           sendRate(0),
           flowControl(0),
           sequence(true),
-          timestamp(true)
+          timestamp(true),
+          groupPrefix("GROUP-"),
+          groupSize(10),
+          groupRandSize(false),
+          groupInterleave(1)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -111,8 +121,8 @@ struct Options : public qpid::Options
             ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
             ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of
input")
             ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
-	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
-	    ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value
implies higher priority)")
+            ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+            ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher
value implies higher priority)")
             ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -131,6 +141,11 @@ struct Options : public qpid::Options
             ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control
to limit queue depth to 2*N. 0 means no flow control.")
             ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages
property (required for duplicate/lost message detection)")
             ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages
property (required for latency measurement)")
+            ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using
message header 'KEY' to hold the group identifier")
+            ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers
with 'STRING' prefix (if group-key specified)")
+            ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group
(if group-key specified)")
+            ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number
of messages per group to [1...group-size] (if group-key specified)")
+            ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave
messages from N different groups (if group-key specified)")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -252,6 +267,68 @@ class MapContentGenerator   : public Con
     const Options& opts;
 };
 
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+public:
+    GroupGenerator(const std::string& key,
+                   const std::string& prefix,
+                   const uint size,
+                   const bool randomize,
+                   const uint interleave)
+        : groupKey(key), groupPrefix(prefix), groupSize(size),
+          randomizeSize(randomize), groupSuffix(0)
+    {
+        if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+        for (uint i = 0; i < 1 || i < interleave; ++i) {
+            newGroup();
+        }
+        current = groups.begin();
+    }
+
+    void setGroupInfo(Message &msg)
+    {
+        if (current == groups.end())
+            current = groups.begin();
+        msg.getProperties()[groupKey] = current->id;
+        // std::cout << "SENDING GROUPID=[" << current->id << "]" <<
std::endl;
+        if (++(current->count) == current->size) {
+            newGroup();
+            groups.erase(current++);
+        } else
+            ++current;
+    }
+
+  private:
+    const std::string& groupKey;
+    const std::string& groupPrefix;
+    const uint groupSize;
+    const bool randomizeSize;
+
+    uint groupSuffix;
+
+    struct GroupState {
+        std::string id;
+        const uint size;
+        uint count;
+        GroupState( const std::string& i, const uint s )
+            : id(i), size(s), count(0) {}
+    };
+    typedef std::list<GroupState> GroupList;
+    GroupList groups;
+    GroupList::iterator current;
+
+    void newGroup() {
+        std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+        groupId << groupSuffix++;
+        uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize;
+        // std::cout << "New group: GROUPID=[" << groupId.str() << "] size="
<< size << std::endl;
+        GroupState group( groupId.str(), size );
+        groups.push_back( group );
+    }
+};
+
 int main(int argc, char ** argv)
 {
     Connection connection;
@@ -296,6 +373,14 @@ int main(int argc, char ** argv)
             else
                 contentGen.reset(new FixedContentGenerator(opts.contentString));
 
+            std::auto_ptr<GroupGenerator> groupGen;
+            if (!opts.groupKey.empty())
+                groupGen.reset(new GroupGenerator(opts.groupKey,
+                                                  opts.groupPrefix,
+                                                  opts.groupSize,
+                                                  opts.groupRandSize,
+                                                  opts.groupInterleave));
+
             qpid::sys::AbsTime start = qpid::sys::now();
             int64_t interval = 0;
             if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
@@ -312,9 +397,6 @@ int main(int argc, char ** argv)
                 ++sent;
                 if (opts.sequence)
                     msg.getProperties()[SN] = sent;
-                if (opts.timestamp)
-                    msg.getProperties()[TS] = int64_t(
-                        qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
                 if (opts.flowControl) {
                     if ((sent % opts.flowControl) == 0) {
                         msg.setReplyTo(flowControlAddress);
@@ -323,6 +405,12 @@ int main(int argc, char ** argv)
                     else
                         msg.setReplyTo(Address()); // Clear the reply address.
                 }
+                if (groupGen.get())
+                    groupGen->setGroupInfo(msg);
+
+                if (opts.timestamp)
+                    msg.getProperties()[TS] = int64_t(
+                        qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
                 sender.send(msg);
                 reporter.message(msg);
 

Modified: qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml (original)
+++ qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml Fri Oct  7 14:21:48 2011
@@ -59,6 +59,7 @@
     <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="producer-flow-control.xml"/>
     <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="AMQP-Compatibility.xml"/>
     <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Qpid-Interoperability-Documentation.xml"/>
+    <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Using-message-groups.xml"/>
 
 </chapter>
 

Modified: qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml (original)
+++ qpid/trunk/qpid/doc/book/src/AMQP-Messaging-Broker-CPP.xml Fri Oct  7 14:21:48 2011
@@ -52,6 +52,7 @@
     <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Starting-a-cluster.xml"/>
     <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="ACL.xml"/>
     <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="producer-flow-control.xml"/>
+    <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Using-message-groups.xml"/>
 </chapter>
  
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Oct
 7 14:21:48 2011
@@ -694,7 +694,8 @@ public class QMFService implements Confi
         public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final
BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory,
                                                                                         
        final String srcQueue,
                                                                                         
        final String destQueue,
-                                                                                        
        final Long qty)
+                                                                                        
        final Long qty,
+                                                                                        
        final Map filter)  // TODO: move based on group identifier
         {
             // TODO
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
@@ -731,6 +732,14 @@ public class QMFService implements Confi
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
         }
 
+        public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory
factory,
+                                                                         final String type,
+                                                                         final String name)
+        {
+            //TODO:
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
         public UUID getId()
         {
             return _obj.getId();
@@ -1102,7 +1111,8 @@ public class QMFService implements Confi
         }
 
         public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory
factory,
-                                                                        final Long request)
+                                                                        final Long request,
+                                                                        final Map filter)
  // TODO: support for purge-by-group-identifier
         {
             try
             {
@@ -1118,7 +1128,8 @@ public class QMFService implements Confi
         public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory
factory, 
                                                                             final Long request,

                                                                             final Boolean
useAltExchange, 
-                                                                            final String
exchange)
+                                                                            final String
exchange,
+                                                                            final Map filter)
  // TODO: support for re-route-by-group-identifier
         {
             //TODO
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Oct  7 14:21:48 2011
@@ -92,6 +92,7 @@
       <arg name="srcQueue"          dir="I" type="sstr" desc="Source queue"/>
       <arg name="destQueue"         dir="I" type="sstr" desc="Destination queue"/>
       <arg name="qty"               dir="I" type="uint32" desc="# of messages to move.
0 means all messages"/>
+      <arg name="filter"  dir="I" type="map" default="{}"   desc="if specified, move only
those messages matching this filter"/>
     </method>
 
     <method name="setLogLevel" desc="Set the log level">
@@ -115,6 +116,13 @@
       <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>

     </method>
 
+    <method name="query" desc="Query the current state of an object.">
+      <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>

+      <arg name="results" dir="O" type="map"  desc="A snapshot of the object's state."/>
+    </method>
+
+
   </class>
 
   <!--
@@ -180,12 +188,14 @@
 
     <method name="purge" desc="Discard all or some messages on a queue">
       <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for
n messages"/>
+      <arg name="filter"  dir="I" type="map" default="{}"  desc="if specified, purge only
those messages matching this filter"/>
     </method>
 
     <method name="reroute" desc="Remove all or some messages on this queue and route them
to an exchange">
       <arg name="request"        dir="I" type="uint32" desc="0 for all messages or n>0
for n messages"/>
       <arg name="useAltExchange" dir="I" type="bool"   desc="Iff true, use the queue's
configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
       <arg name="exchange"       dir="I" type="sstr"   desc="Name of the exchange to route
the messages through"/>
+      <arg name="filter"  dir="I" type="map" default="{}" desc="if specified, reroute
only those messages matching this filter"/>
     </method>
   </class>
 

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Oct  7 14:21:48 2011
@@ -33,3 +33,4 @@ from lvq import *
 from priority import *
 from threshold import *
 from extensions import *
+from msg_groups import *

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Fri Oct  7 14:21:48
2011
@@ -156,7 +156,7 @@ class ManagementTest (TestBase010):
         queues = self.qmf.getObjects(_class="queue")
 
         "Move 10 messages from src-queue to dest-queue"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue",
10)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue",
10, {})
         self.assertEqual (result.status, 0) 
 
         sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -166,7 +166,7 @@ class ManagementTest (TestBase010):
         self.assertEqual (dq.msgDepth,10)
 
         "Move all remaining messages to destination"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue",
0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue",
0, {})
         self.assertEqual (result.status,0)
 
         sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -176,16 +176,16 @@ class ManagementTest (TestBase010):
         self.assertEqual (dq.msgDepth,20)
 
         "Use a bad source queue name"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue",
"dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue",
"dest-queue", 0, {})
         self.assertEqual (result.status,4)
 
         "Use a bad destination queue name"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue",
0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue",
0, {})
         self.assertEqual (result.status,4)
 
         " Use a large qty (40) to move from dest-queue back to "
         " src-queue- should move all "
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue",
"src-queue", 40)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue",
"src-queue", 40, {})
         self.assertEqual (result.status,0)
 
         sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -225,19 +225,19 @@ class ManagementTest (TestBase010):
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
 
         "Purge top message from purge-queue"
-        result = pq.purge(1)
+        result = pq.purge(1, {})
         self.assertEqual (result.status, 0) 
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,19)
 
         "Purge top 9 messages from purge-queue"
-        result = pq.purge(9)
+        result = pq.purge(9, {})
         self.assertEqual (result.status, 0) 
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,10)
 
         "Purge all messages from purge-queue"
-        result = pq.purge(0)
+        result = pq.purge(0, {})
         self.assertEqual (result.status, 0) 
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,0)
@@ -263,7 +263,7 @@ class ManagementTest (TestBase010):
         #reroute messages from test queue to amq.fanout (and hence to
         #rerouted queue):
         pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
-        result = pq.reroute(0, False, "amq.fanout")
+        result = pq.reroute(0, False, "amq.fanout", {})
         self.assertEqual(result.status, 0) 
 
         #verify messages are all rerouted:
@@ -301,7 +301,7 @@ class ManagementTest (TestBase010):
         pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
 
         "Reroute top message from reroute-queue to alternate exchange"
-        result = pq.reroute(1, True, "")
+        result = pq.reroute(1, True, "", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
@@ -309,7 +309,7 @@ class ManagementTest (TestBase010):
         self.assertEqual(aq.msgDepth,1)
 
         "Reroute top 9 messages from reroute-queue to alt.direct2"
-        result = pq.reroute(9, False, "alt.direct2")
+        result = pq.reroute(9, False, "alt.direct2", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -317,11 +317,11 @@ class ManagementTest (TestBase010):
         self.assertEqual(aq.msgDepth,9)
 
         "Reroute using a non-existent exchange"
-        result = pq.reroute(0, False, "amq.nosuchexchange")
+        result = pq.reroute(0, False, "amq.nosuchexchange", {})
         self.assertEqual(result.status, 4)
 
         "Reroute all messages from reroute-queue"
-        result = pq.reroute(0, False, "alt.direct2")
+        result = pq.reroute(0, False, "alt.direct2", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -337,7 +337,7 @@ class ManagementTest (TestBase010):
             session.message_transfer(destination="amq.direct", message=msg)
 
         "Reroute onto the same queue"
-        result = pq.reroute(0, False, "amq.direct")
+        result = pq.reroute(0, False, "amq.direct", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         self.assertEqual(pq.msgDepth,20)
@@ -365,7 +365,7 @@ class ManagementTest (TestBase010):
         # 4. Call reroute on queue Y and specify that messages should
         # be sent to exchange A
         y = self.qmf.getObjects(_class="queue", name="Y")[0]
-        result = y.reroute(1, False, "A")
+        result = y.reroute(1, False, "A", {})
         self.assertEqual(result.status, 0)
 
         # 5. verify that the message is rerouted through B (as A has

Modified: qpid/trunk/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=1180050&r1=1180049&r2=1180050&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-config (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-config Fri Oct  7 14:21:48 2011
@@ -96,6 +96,8 @@ class Config:
         self._flowResumeCount   = None
         self._flowStopSize      = None
         self._flowResumeSize    = None
+        self._msgGroupHeader    = None
+        self._sharedMsgGroup    = False
         self._extra_arguments   = []
         self._returnCode        = 0
 
@@ -116,13 +118,16 @@ FLOW_STOP_COUNT   = "qpid.flow_stop_coun
 FLOW_RESUME_COUNT = "qpid.flow_resume_count"
 FLOW_STOP_SIZE    = "qpid.flow_stop_size"
 FLOW_RESUME_SIZE  = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP  = "qpid.shared_msg_group"
 #There are various arguments to declare that have specific program
 #options in this utility. However there is now a generic mechanism for
 #passing arguments as well. The SPECIAL_ARGS list contains the
 #arguments for which there are specific program options defined
 #i.e. the arguments for which there is special processing on add and
 #list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE]
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+              MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
 
 class JHelpFormatter(IndentedHelpFormatter):
     """Format usage and description without stripping newlines from usage strings
@@ -182,6 +187,10 @@ def OptionsAndArguments(argv):
                       help="Turn on sender flow control when the number of queued messages
exceeds this value.")
     group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
                       help="Turn off sender flow control when the number of queued messages
drops below this value.")
+    group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+                      help="Enable message groups. Specify name of header that holds group
identifier.")
+    group3.add_option("--shared-groups", action="store_true",
+                      help="Allow message group consumption across multiple consumers.")
     group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
                       metavar="<NAME=VALUE>", help="Specify a key-value pair to add
to queue arguments")
     # no option for declaring an exclusive queue - which can only be used by the session
that creates it.
@@ -263,6 +272,10 @@ def OptionsAndArguments(argv):
         config._flowStopCount = opts.flow_stop_count
     if opts.flow_resume_count:
         config._flowResumeCount = opts.flow_resume_count
+    if opts.group_header:
+        config._msgGroupHeader = opts.group_header
+    if opts.shared_groups:
+        config._sharedMsgGroup = True
     if opts.extra_arguments:
         config._extra_arguments = opts.extra_arguments
     return args
@@ -442,6 +455,8 @@ class BrokerManager:
                 if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
                 if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
                 if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+                if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+                if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
                 print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if
not k in SPECIAL_ARGS])
 
     def QueueListRecurse(self, filter):
@@ -534,6 +549,11 @@ class BrokerManager:
         if config._flowResumeCount:
             declArgs[FLOW_RESUME_COUNT]  = config._flowResumeCount
 
+        if config._msgGroupHeader:
+            declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+        if config._sharedMsgGroup:
+            declArgs[SHARED_MSG_GROUP] = 1
+
         if config._altern_ex != None:
             self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex,
passive=config._passive, durable=config._durable, arguments=declArgs)
         else:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message