qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1178873 - in /qpid/branches/qpid-3346/qpid: cpp/src/qpid/broker/ tests/src/py/qpid_tests/broker_0_10/
Date Tue, 04 Oct 2011 17:41:03 GMT
Author: kgiusti
Date: Tue Oct  4 17:41:03 2011
New Revision: 1178873

URL: http://svn.apache.org/viewvc?rev=1178873&view=rev
Log:
QPID-3346: add client and mgmt unit tests, fix bugs uncovered.

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
    qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1178873&r1=1178872&r2=1178873&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Tue Oct  4 17:41:03
2011
@@ -90,8 +90,6 @@ void MessageGroupManager::requeued( cons
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    // @todo KAG BUG - how to ensure requeue happens in the correct order?
-    // @todo KAG BUG - if requeue is not in correct order - what do we do?  throw?
     std::string group( getGroupId(qm) );
     GroupMap::iterator gs = messageGroups.find( group );
     assert( gs != messageGroups.end() );
@@ -117,10 +115,21 @@ void MessageGroupManager::dequeued( cons
     assert( gs != messageGroups.end() );
     GroupState& state( gs->second );
     assert( state.members.size() != 0 );
+    assert( state.acquired != 0 );
+    state.acquired -= 1;
 
     // likely to be at or near begin() if dequeued in order
-    {
-        GroupState::PositionFifo::iterator pos = state.members.begin();
+    bool reFreeNeeded = false;
+    if (state.members.front() == qm.position) {
+        if (!state.owned()) {
+            // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
+            // if on freelist, it is indexed by first member, which is about to be removed!
+            unFree(state);
+            reFreeNeeded = true;
+        }
+        state.members.pop_front();
+    } else {
+        GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
         GroupState::PositionFifo::iterator end = state.members.end();
         while (pos != end) {
             if (*pos == qm.position) {
@@ -131,35 +140,37 @@ void MessageGroupManager::dequeued( cons
         }
     }
 
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
     uint32_t total = state.members.size();
     if (total == 0) {
-        if (!state.owned()) {  // unlikely, but need to remove from the free list before
erase
-            unFree( state );
-        }
         QPID_LOG( trace, "group queue " << qName << ": deleting group id=" <<
gs->first);
         messageGroups.erase( gs );
-    } else {
-        if (state.acquired == 0 && state.owned()) {
-            QPID_LOG( trace, "group queue " << qName <<
-                      ": consumer name=" << state.owner << " released group id="
<< gs->first);
-            disown(state);
-        }
+    } else if (state.acquired == 0 && state.owned()) {
+        QPID_LOG( trace, "group queue " << qName <<
+                  ": consumer name=" << state.owner << " released group id="
<< gs->first);
+        disown(state);
+    } else if (reFreeNeeded) {
+        disown(state);
     }
     QPID_LOG( trace, "group queue " << qName <<
               ": dequeued message from group id=" << group << " total=" <<
total );
 }
 
-void MessageGroupManager::consumerAdded( const Consumer& c )
+void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
 {
-    assert(consumers.find(c.getName()) == consumers.end());
-    consumers[c.getName()] = 0;     // no groups owned yet
-    QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" <<
c.getName() );
+#if 0
+    // allow a re-subscribing consumer
+    if (consumers.find(c.getName()) == consumers.end()) {
+        consumers[c.getName()] = 0;     // no groups owned yet
+        QPID_LOG( trace, "group queue " << qName << ": added consumer, name="
<< c.getName() );
+    } else {
+        QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed,
name=" << c.getName() );
+    }
+#endif
 }
 
-void MessageGroupManager::consumerRemoved( const Consumer& c )
+void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
 {
+#if 0
     const std::string& name(c.getName());
     Consumers::iterator consumer = consumers.find(name);
     assert(consumer != consumers.end());
@@ -170,14 +181,22 @@ void MessageGroupManager::consumerRemove
 
         GroupState& state( gs->second );
         if (state.owner == name) {
-            --count;
-            disown(state);
-            QPID_LOG( trace, "group queue " << qName <<
-                      ": consumer name=" << name << " released group id=" <<
gs->first);
+            if (state.acquired == 0) {
+                --count;
+                disown(state);
+                QPID_LOG( trace, "group queue " << qName <<
+                          ": consumer name=" << name << " released group id="
<< gs->first);
+            }
         }
     }
-    consumers.erase( consumer );
-    QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" <<
name );
+    if (count == 0) {
+        consumers.erase( consumer );
+        QPID_LOG( trace, "group queue " << qName << ": removed consumer name="
<< name );
+    } else {
+        // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
+        QPID_LOG( trace, "group queue " << qName << ": consumer name=" <<
name << " unsubscribed with outstanding messages.");
+    }
+#endif
 }
 
 
@@ -196,9 +215,11 @@ bool MessageGroupManager::nextConsumable
                 return false;           // shouldn't happen - should find nextFree
         }
     } else {  // no free groups available
+#if 0
         if (consumers[c->getName()] == 0) {  // and none currently owned
             return false;       // so nothing available to consume
         }
+#endif
         if (!messages.next( c->position, next ))
             return false;
     }
@@ -356,7 +377,7 @@ void MessageGroupManager::setState(const
 {
     using namespace qpid::framing;
     messageGroups.clear();
-    consumers.clear();
+    //consumers.clear();
     freeGroups.clear();
 
     framing::Array groupState(TYPE_CODE_MAP);
@@ -398,7 +419,8 @@ void MessageGroupManager::setState(const
             state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
         messageGroups[state.group] = state;
         if (state.owned())
-            consumers[state.owner]++;
+            //consumers[state.owner]++;
+            ;
         else {
             assert(state.members.size());
             freeGroups[state.members.front()] = &messageGroups[state.group];

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1178873&r1=1178872&r2=1178873&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Tue Oct  4 17:41:03
2011
@@ -42,7 +42,7 @@ class MessageGroupManager : public State
     const std::string qName;            // name of parent queue (for logs)
 
     struct GroupState {
-        typedef std::list<framing::SequenceNumber> PositionFifo;
+        typedef std::deque<framing::SequenceNumber> PositionFifo;
 
         std::string group;  // group identifier
         std::string owner;  // consumer with outstanding acquired messages
@@ -54,13 +54,13 @@ class MessageGroupManager : public State
         bool owned() const {return !owner.empty();}
     };
     typedef std::map<std::string, struct GroupState> GroupMap;
-    typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
+    //typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
     typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
 
     // note: update getState()/setState() when changing this object's state implementation
     GroupMap messageGroups; // index: group name
     GroupFifo freeGroups;   // ordered by oldest free msg
-    Consumers consumers;    // index: consumer name
+    //Consumers consumers;    // index: consumer name
 
     static const std::string qpidMessageGroupKey;
     static const std::string qpidMessageGroupTimestamp;
@@ -76,21 +76,17 @@ class MessageGroupManager : public State
     void own( GroupState& state, const std::string& owner )
     {
         state.owner = owner;
-        consumers[state.owner]++;
+        //consumers[state.owner]++;
         unFree( state );
     }
     void disown( GroupState& state )
     {
-        assert(consumers[state.owner]);
-        consumers[state.owner]--;
+        //assert(consumers[state.owner]);
+        //consumers[state.owner]--;
         state.owner.clear();
         assert(state.members.size());
-#ifdef NDEBUG
+        assert(freeGroups.find(state.members.front()) == freeGroups.end());
         freeGroups[state.members.front()] = &state;
-#else
-        bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
-        (void) unique; assert(unique);
-#endif
     }
 
  public:

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1178873&r1=1178872&r2=1178873&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct  4 17:41:03 2011
@@ -558,7 +558,7 @@ namespace {
     MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
     {
         using namespace qpid::types;
-        if (filter) {
+        if (filter && !filter->empty()) {
             Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
             if (i != filter->end()) {
 

Modified: qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1178873&r1=1178872&r2=1178873&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Tue Oct 
4 17:41:03 2011
@@ -33,3 +33,4 @@ from lvq import *
 from priority import *
 from threshold import *
 from extensions import *
+from msg_groups import *

Modified: qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1178873&r1=1178872&r2=1178873&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Tue Oct
 4 17:41:03 2011
@@ -19,6 +19,7 @@
 
 from qpid.messaging import *
 from qpid.tests.messaging import Base
+import qmf.console
 
 from time import sleep
 #
@@ -360,8 +361,564 @@ class MultiConsumerMsgGroupTests(Base):
         except Empty:
             pass
 
+    def test_transaction(self):
+        """ Verify behavior when using transactions.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","A","B","B","A","B"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        s1 = self.conn.session(transactional=True)
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        s2 = self.conn.session(transactional=True)
+        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+        # C1 gets group A
+        m1 = c1.fetch(0)
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        # C2 gets group B
+        m2 = c2.fetch(0)
+        assert m2.properties['THE-GROUP'] == 'B'
+        assert m2.content['index'] == 2
+
+        s1.acknowledge(m1)  # A-0 consumed, A group freed
+        s2.acknowledge(m2)  # B-2 consumed, B group freed
+
+        s1.commit()
+        s2.rollback()  # release B-2 and group B
+
+        ## Q: ["A1","B2","B3","A4","B5"]
+
+        # C2 should be able to get the next A
+        m3 = c2.fetch(0)
+        assert m3.properties['THE-GROUP'] == 'A'
+        assert m3.content['index'] == 1
+
+        # C1 should be able to get B-2
+        m4 = c1.fetch(0)
+        assert m4.properties['THE-GROUP'] == 'B'
+        assert m4.content['index'] == 2
+
+        s2.acknowledge(m3)  # C2 consumes A-1
+        s1.acknowledge(m4)  # C1 consumes B-2
+        s1.commit()    # C1 consume B-2 occurs, free group B
+
+        ## Q: [["A1",]"B3","A4","B5"]
+
+        # A-1 is still considered owned by C2, since the commit has yet to
+        # occur, so the next available to C1 would be B-3
+        m5 = c1.fetch(0)   # B-3
+        assert m5.properties['THE-GROUP'] == 'B'
+        assert m5.content['index'] == 3
+
+        # and C2 should find A-4 available, since it owns the A group
+        m6 = c2.fetch(0)  # A-4
+        assert m6.properties['THE-GROUP'] == 'A'
+        assert m6.content['index'] == 4
+
+        s2.acknowledge(m6)  # C2 consumes A-4
+
+        # uh-oh, A-1 and A-4 released, along with A group
+        s2.rollback()
+
+        ## Q: ["A1",["B3"],"A4","B5"]
+        m7 = c1.fetch(0)   # A-1 is found
+        assert m7.properties['THE-GROUP'] == 'A'
+        assert m7.content['index'] == 1
+
+        ## Q: [["A1"],["B3"],"A4","B5"]
+        # since C1 "owns" both A and B group, C2 should find nothing available
+        try:
+            m8 = c2.fetch(0)
+            assert False    # should not get here
+        except Empty:
+            pass
+
+        # C1 next gets A4
+        m9 = c1.fetch(0)
+        assert m9.properties['THE-GROUP'] == 'A'
+        assert m9.content['index'] == 4
+
+        s1.acknowledge()
+
+        ## Q: [["A1"],["B3"],["A4"],"B5"]
+        # even though C1 acknowledges A1,B3, and A4, B5 is still considered
+        # owned as the commit has yet to take place
+        try:
+            m10 = c2.fetch(0)
+            assert False    # should not get here
+        except Empty:
+            pass
+
+        # now A1,B3,A4 dequeued, B5 should be free
+        s1.commit()
+
+        ## Q: ["B5"]
+        m11 = c2.fetch(0)
+        assert m11.properties['THE-GROUP'] == 'B'
+        assert m11.content['index'] == 5
+
+        s2.acknowledge()
+        s2.commit()
+
+    def test_query(self):
+        """ Verify the queue query method against message groups
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","C","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        s2 = self.setup_session()
+        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+        m1 = c1.fetch(0)
+        m2 = c2.fetch(0)
+
+        # at this point, group A should be owned by C1, group B by C2, and
+        # group C should be available
+
+        # now setup a QMF session, so we can call methods
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        brokers = self.qmf_session.getObjects(_class="broker")
+        assert len(brokers) == 1
+        broker = brokers[0]
+
+        # verify the query method call's group information
+        rc = broker.query("queue", "msg-group-q")
+        assert rc.status == 0
+        assert rc.text == "OK"
+        results = rc.outArgs['results']
+        assert 'qpid.message_group_queue' in results
+        q_info = results['qpid.message_group_queue']
+        assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
+        assert 'group_state' in q_info and len(q_info['group_state']) == 3
+        for g_info in q_info['group_state']:
+            assert 'group_id' in g_info
+            if g_info['group_id'] == "A":
+                assert g_info['msg_count'] == 3
+                assert g_info['consumer'] != ""
+            elif g_info['group_id'] == "B":
+                assert g_info['msg_count'] == 2
+                assert g_info['consumer'] != ""
+            elif g_info['group_id'] == "C":
+                assert g_info['msg_count'] == 2
+                assert g_info['consumer'] == ""
+            else:
+                assert(False)    # should never get here
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_purge_free(self):
+        """ Verify we can purge a queue of all messages of a given "unowned"
+        group.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        # now setup a QMF session, so we can call methods
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+        assert queue
+        msg_filter = { 'filter_type' : 'header_match_str',
+                       'filter_params' : { 'header_key' : "THE-GROUP",
+                                           'header_value' : "B" }}
+        assert queue.msgDepth == 6
+        rc = queue.purge(0, msg_filter)
+        assert rc.status == 0
+        queue.update()
+        assert queue.msgDepth == 4
+
+        # verify all B's removed....
+        s2 = self.setup_session()
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] != 'B'
+                count += 1
+        except Empty:
+            pass
+        assert count == 4
+
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_purge_acquired(self):
+        """ Verify we can purge messages from an acquired group.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        # acquire group "A"
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        m1 = c1.fetch(0)
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        # now setup a QMF session, so we can purge group A
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+        assert queue
+        msg_filter = { 'filter_type' : 'header_match_str',
+                       'filter_params' : { 'header_key' : "THE-GROUP",
+                                           'header_value' : "A" }}
+        assert queue.msgDepth == 6
+        rc = queue.purge(0, msg_filter)
+        assert rc.status == 0
+        queue.update()
+        queue.msgDepth == 4   # the pending acquired A still counts!
+
+        # verify all other A's removed....
+        s2 = self.setup_session()
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] != 'A'
+                count += 1
+        except Empty:
+            pass
+        assert count == 3   # only 3 really available
+        s1.acknowledge()    # ack the consumed A-0
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_purge_count(self):
+        """ Verify we can purge a fixed number of messages from an acquired
+        group.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        # acquire group "A"
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        m1 = c1.fetch(0)
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
 
+        # now setup a QMF session, so we can purge group A
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+        assert queue
+        msg_filter = { 'filter_type' : 'header_match_str',
+                       'filter_params' : { 'header_key' : "THE-GROUP",
+                                           'header_value' : "A" }}
+        assert queue.msgDepth == 6
+        rc = queue.purge(1, msg_filter)
+        assert rc.status == 0
+        queue.update()
+        queue.msgDepth == 5   # the pending acquired A still counts!
 
+        # verify all other A's removed....
+        s2 = self.setup_session()
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        a_count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                if m2.properties['THE-GROUP'] != 'A':
+                    count += 1
+                else:
+                    a_count += 1
+        except Empty:
+            pass
+        assert count == 3   # non-A's
+        assert a_count == 1 # and one is an A
+        s1.acknowledge()    # ack the consumed A-0
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_move_all(self):
+        """ Verify we can move messages from an acquired group.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        # set up destination queue
+        rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
+                                 " node: {x-declare: {arguments:" +
+                                 " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        # acquire group "A"
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        m1 = c1.fetch(0)
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        # now setup a QMF session, so we can move what's left of group A
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        brokers = self.qmf_session.getObjects(_class="broker")
+        assert len(brokers) == 1
+        broker = brokers[0]
+        msg_filter = { 'filter_type' : 'header_match_str',
+                       'filter_params' : { 'header_key' : "THE-GROUP",
+                                           'header_value' : "A" }}
+        rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
+        assert rc.status == 0
+
+        # verify all other A's removed from msg-group-q
+        s2 = self.setup_session()
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] != 'A'
+                count += 1
+        except Empty:
+            pass
+        assert count == 3   # only 3 really available
+
+        # verify the moved A's are at the dest-q
+        s2 = self.setup_session()
+        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] == 'A'
+                assert m2.content['index'] == 2 or m2.content['index'] == 5
+                count += 1
+        except Empty:
+            pass
+        assert count == 2 # two A's moved
+
+        s1.acknowledge()    # ack the consumed A-0
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_move_count(self):
+        """ Verify we can move a fixed number of messages from an acquired group.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        # set up destination queue
+        rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
+                                 " node: {x-declare: {arguments:" +
+                                 " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        # now setup a QMF session, so we can move group B
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        brokers = self.qmf_session.getObjects(_class="broker")
+        assert len(brokers) == 1
+        broker = brokers[0]
+        msg_filter = { 'filter_type' : 'header_match_str',
+                       'filter_params' : { 'header_key' : "THE-GROUP",
+                                           'header_value' : "B" }}
+        rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
+        assert rc.status == 0
+
+        # verify all B's removed from msg-group-q
+        s2 = self.setup_session()
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] != 'B'
+                count += 1
+        except Empty:
+            pass
+        assert count == 4
+
+        # verify the moved B's are at the dest-q
+        s2 = self.setup_session()
+        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] == 'B'
+                assert m2.content['index'] == 1 or m2.content['index'] == 3
+                count += 1
+        except Empty:
+            pass
+        assert count == 2
+
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_reroute(self):
+        """ Verify we can reroute messages from an acquired group.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C","A"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        # create a topic exchange for the reroute
+        rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
+                                 " node: {type: topic}}")
+
+        # acquire group "A"
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        m1 = c1.fetch(0)
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        # now setup a QMF session, so we can reroute group A
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+        assert queue
+        msg_filter = { 'filter_type' : 'header_match_str',
+                       'filter_params' : { 'header_key' : "THE-GROUP",
+                                           'header_value' : "A" }}
+        assert queue.msgDepth == 6
+        rc = queue.reroute(0, False, "reroute-q", msg_filter)
+        assert rc.status == 0
+        queue.update()
+        queue.msgDepth == 4   # the pending acquired A still counts!
+
+        # verify all other A's removed....
+        s2 = self.setup_session()
+        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+        count = 0
+        try:
+            while True:
+                m2 = b1.fetch(0)
+                assert m2.properties['THE-GROUP'] != 'A'
+                count += 1
+        except Empty:
+            pass
+        assert count == 3   # only 3 really available
+
+        # and what of reroute-q?
+        count = 0
+        try:
+            while True:
+                m2 = rcvr.fetch(0)
+                assert m2.properties['THE-GROUP'] == 'A'
+                assert m2.content['index'] == 2 or m2.content['index'] == 5
+                count += 1
+        except Empty:
+            pass
+        assert count == 2
+
+        s1.acknowledge()    # ack the consumed A-0
+        self.qmf_session.delBroker(self.qmf_broker)
+
+    def test_queue_delete(self):
+        """ Test deleting a queue while consumers are active.
+        """
+
+        ## Create a msg group queue
+
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+        groups = ["A","B","A","B","C"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            snd.send(m)
+
+        ## Queue = A-0, B-1, A-2, b-3, C-4
+        ## Owners= ---, ---, ---, ---, ---
+
+        # create consumers
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":1})
+        s2 = self.setup_session()
+        c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+        # C1 should acquire A-0
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        # c2 acquires B-1
+        m2 = c2.fetch(0)
+        assert m2.properties['THE-GROUP'] == 'B'
+        assert m2.content['index'] == 1
+
+        # with group A and B owned, and C free, delete the
+        # queue
+        snd.close()
+        self.ssn.close()
 
 class StickyConsumerMsgGroupTests(Base):
     """



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message