qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1159671 - in /qpid/branches/qpid-3346/qpid/cpp/src: qpid/broker/Queue.cpp tests/QueueTest.cpp
Date Fri, 19 Aug 2011 15:56:32 GMT
Author: kgiusti
Date: Fri Aug 19 15:56:32 2011
New Revision: 1159671

URL: http://svn.apache.org/viewvc?rev=1159671&view=rev
Log:
QPID-3346: checkpoint - free group heuristic and unit test

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1159671&r1=1159670&r2=1159671&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Fri Aug 19 15:56:32 2011
@@ -101,19 +101,48 @@ class MessageAllocator
     MessageAllocator( Queue *q ) : queue(q) {}
     virtual ~MessageAllocator() {};
 
-    // assumes caller holds messageLock
-    virtual bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
-                              const Mutex::ScopedLock&);
-    /** acquire a message previously browsed via nextMessage().  assume messageLock held
+    // Note: all methods taking a mutex assume the caller is holding the
+    // Queue::messageLock during the method call.
+
+    /** Determine the next message available for consumption by the consumer
+     * @param next set to the next message that the consumer may acquire.
+     * @return true if message is available
+     */
+    virtual bool nextConsumableMessage( Consumer::shared_ptr, QueuedMessage& next,
+                                        const Mutex::ScopedLock&)
+    {
+        Messages& messages(queue->getMessages());
+        if (!messages.empty()) {
+            next = messages.front();    // by default, consume oldest msg
+            return true;
+        }
+        return false;
+    }
+    /** Determine the next message available for browsing by the consumer
+     * @param next set to the next message that the consumer may browse.
+     * @return true if a message is available
+     */
+    virtual bool nextBrowsableMessage( Consumer::shared_ptr c, QueuedMessage& next,
+                                       const Mutex::ScopedLock&)
+    {
+        Messages& messages(queue->getMessages());
+        if (!messages.empty() && messages.next(c->position, next))
+            return true;
+        return false;
+    }
+    /** acquire a message previously returned via next*Message().
      * @param consumer name of consumer that is attempting to acquire the message
      * @param qm the message to be acquired
      * @param messageLock - ensures caller is holding it!
      * @returns true if acquire is successful, false if acquire failed.
      */
-    virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm,
-                             const Mutex::ScopedLock&);
+    virtual bool acquireMessage( const std::string&, const QueuedMessage&,
+                                 const Mutex::ScopedLock&)
+    {
+        return true;
+    }
 
-    /** hook to add any interesting management state to the status map (lock held) */
+    /** hook to add any interesting management state to the status map */
     virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const
{};
 };
 
@@ -125,24 +154,55 @@ class MessageGroupManager : public Queue
     const unsigned int timestamp;       // mark messages with timestamp if set
 
     struct GroupState {
-        //const std::string group;          // group identifier
-        //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
-        std::string owner; // consumer with outstanding acquired messages
+        typedef std::list<framing::SequenceNumber> PositionFifo;
+
+        std::string group;  // group identifier
+        std::string owner;  // consumer with outstanding acquired messages
         uint32_t acquired;  // count of outstanding acquired messages
-        uint32_t total; // count of enqueued messages in this group
-        GroupState() : acquired(0), total(0) {}
+        //uint32_t total;     // count of enqueued messages in this group
+        PositionFifo members;   // msgs belonging to this group
+
+        GroupState() : acquired(0) {}
+        bool owned() const {return !owner.empty();}
     };
     typedef std::map<std::string, struct GroupState> GroupMap;
-    typedef std::set<std::string> Consumers;
+    typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
+    typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
 
-    GroupMap messageGroups;
-    Consumers consumers;
+    GroupMap messageGroups; // index: group name
+    GroupFifo freeGroups; // ordered by oldest free msg
+    Consumers consumers;    // index: consumer name
 
     static const std::string qpidMessageGroupKey;
     static const std::string qpidMessageGroupTimestamp;
     static const std::string qpidMessageGroupDefault;
 
     const std::string getGroupId( const QueuedMessage& qm ) const;
+    void unFree( const GroupState& state )
+    {
+        GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+        assert( pos != freeGroups.end() && pos->second == &state );
+        freeGroups.erase( pos );
+    }
+    void own( GroupState& state, const std::string& owner )
+    {
+        state.owner = owner;
+        consumers[state.owner]++;
+        unFree( state );
+    }
+    void disown( GroupState& state )
+    {
+        assert(consumers[state.owner]);
+        consumers[state.owner]--;
+        state.owner.clear();
+        assert(state.members.size());
+#ifdef NDEBUG
+        freeGroups[state.members.front()] = &state;
+#else
+        bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
+        (void) unique; assert(unique);
+#endif
+    }
 
  public:
 
@@ -156,10 +216,11 @@ class MessageGroupManager : public Queue
     void dequeued( const QueuedMessage& qm );
     void consumerAdded( const Consumer& );
     void consumerRemoved( const Consumer& );
-    bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
-                      const Mutex::ScopedLock&);
-    bool canAcquire(const std::string& consumer, const QueuedMessage& msg,
-                    const Mutex::ScopedLock&);
+    bool nextConsumableMessage( Consumer::shared_ptr c, QueuedMessage& next,
+                                const Mutex::ScopedLock&);
+    // uses default nextBrowsableMessage()
+    bool acquireMessage(const std::string& consumer, const QueuedMessage& msg,
+                        const Mutex::ScopedLock&);
     void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const;
     bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
 };
@@ -339,7 +400,7 @@ bool Queue::acquire(const QueuedMessage&
     assertClusterSafe();
     QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
 
-    if (!allocator->canAcquire( consumer, msg, locker )) {
+    if (!allocator->acquireMessage( consumer, msg, locker )) {
         QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position <<
" from '" << name);
         return false;
     }
@@ -391,7 +452,7 @@ Queue::ConsumeCode Queue::consumeNextMes
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
 
-        if (!allocator->nextMessage(c, msg, locker)) { // no next available
+        if (!allocator->nextConsumableMessage(c, msg, locker)) { // no next available
             QPID_LOG(debug, "No messages available to dispatch to consumer " <<
                      c->getName() << " on queue '" << name << "'");
             listeners.addListener(c);
@@ -410,7 +471,9 @@ Queue::ConsumeCode Queue::consumeNextMes
 
         if (c->filter(msg.payload)) {
             if (c->accept(msg.payload)) {
-                bool ok = acquire( msg.position, msg );
+                bool ok = allocator->acquireMessage( c->getName(), msg, locker ); 
// inform allocator
+                (void) ok; assert(ok);
+                ok = acquire( msg.position, msg );
                 (void) ok; assert(ok);
                 m = msg;
                 c->position = m.position;
@@ -435,7 +498,7 @@ bool Queue::browseNextMessage(QueuedMess
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
 
-        if (!allocator->nextMessage(c, msg, locker)) { // no next available
+        if (!allocator->nextBrowsableMessage(c, msg, locker)) { // no next available
             QPID_LOG(debug, "No browsable messages available for consumer " <<
                      c->getName() << " on queue '" << name << "'");
             listeners.addListener(c);
@@ -1540,15 +1603,31 @@ const std::string MessageGroupManager::g
 
 void MessageGroupManager::enqueued( const QueuedMessage& qm )
 {
+    // @todo KAG optimization - store reference to group state in QueuedMessage
+    // issue: const-ness??
     std::string group( getGroupId(qm) );
-    uint32_t total = ++messageGroups[group].total;
+    GroupState &state(messageGroups[group]);
+    state.members.push_back(qm.position);
+    uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << queue->getName() <<
               ": added message to group id=" << group << " total=" << total
);
+    if (total == 1) {
+        // newly created group, no owner
+        state.group = group;
+#ifdef NDEBUG
+        freeGroups[qm.position] = &state;
+#else
+        bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
+        (void) unique; assert(unique);
+#endif
+    }
 }
 
 
 void MessageGroupManager::acquired( const QueuedMessage& qm )
 {
+    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+    // issue: const-ness??
     std::string group( getGroupId(qm) );
     GroupMap::iterator gs = messageGroups.find( group );
     assert( gs != messageGroups.end() );
@@ -1561,16 +1640,20 @@ void MessageGroupManager::acquired( cons
 
 void MessageGroupManager::requeued( const QueuedMessage& qm )
 {
+    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+    // issue: const-ness??
+    // @todo KAG BUG - how to ensure requeue happens in the correct order?
+    // @todo KAG BUG - if requeue is not in correct order - what do we do?  throw?
     std::string group( getGroupId(qm) );
     GroupMap::iterator gs = messageGroups.find( group );
     assert( gs != messageGroups.end() );
     GroupState& state( gs->second );
     assert( state.acquired != 0 );
     state.acquired -= 1;
-    if (state.acquired == 0 && !state.owner.empty()) {
+    if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << queue->getName() <<
                   ": consumer name=" << state.owner << " released group id="
<< gs->first);
-        state.owner.clear();   // KAG TODO: need to invalidate consumer's positions?
+        disown(state);
     }
     QPID_LOG( trace, "group queue " << queue->getName() <<
               ": requeued message to group id=" << group << " acquired=" <<
state.acquired );
@@ -1579,22 +1662,41 @@ void MessageGroupManager::requeued( cons
 
 void MessageGroupManager::dequeued( const QueuedMessage& qm )
 {
+    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+    // issue: const-ness??
     std::string group( getGroupId(qm) );
     GroupMap::iterator gs = messageGroups.find( group );
     assert( gs != messageGroups.end() );
     GroupState& state( gs->second );
-    assert( state.total != 0 );
-    uint32_t total = state.total -= 1;
+    assert( state.members.size() != 0 );
+
+    // likely to be at or near begin() if dequeued in order
+    {
+        GroupState::PositionFifo::iterator pos = state.members.begin();
+        GroupState::PositionFifo::iterator end = state.members.end();
+        while (pos != end) {
+            if (*pos == qm.position) {
+                state.members.erase(pos);
+                break;
+            }
+            ++pos;
+        }
+    }
+
     assert( state.acquired != 0 );
     state.acquired -= 1;
-    if (state.total == 0) {
+    uint32_t total = state.members.size();
+    if (total == 0) {
+        if (!state.owned()) {  // unlikely, but need to remove from the free list before
erase
+            unFree( state );
+        }
         QPID_LOG( trace, "group queue " << queue->getName() << ": deleting
group id=" << gs->first);
         messageGroups.erase( gs );
     } else {
-        if (state.acquired == 0 && !state.owner.empty()) {
+        if (state.acquired == 0 && state.owned()) {
             QPID_LOG( trace, "group queue " << queue->getName() <<
                       ": consumer name=" << state.owner << " released group id="
<< gs->first);
-            state.owner.clear();   // KAG TODO: need to invalidate consumer's positions?
+            disown(state);
         }
     }
     QPID_LOG( trace, "group queue " << queue->getName() <<
@@ -1603,81 +1705,84 @@ void MessageGroupManager::dequeued( cons
 
 void MessageGroupManager::consumerAdded( const Consumer& c )
 {
-    const std::string& name(c.getName());
-    bool unique = consumers.insert( name ).second;
-    (void) unique; assert( unique );
-    QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer
name=" << name );
+    assert(consumers.find(c.getName()) == consumers.end());
+    consumers[c.getName()] = 0;     // no groups owned yet
+    QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer,
name=" << c.getName() );
 }
 
 void MessageGroupManager::consumerRemoved( const Consumer& c )
 {
     const std::string& name(c.getName());
-    size_t count = consumers.erase( name );
-    (void) count; assert( count == 1 );
+    Consumers::iterator consumer = consumers.find(name);
+    assert(consumer != consumers.end());
+    size_t count = consumer->second;
 
-    bool needReset = false;
     for (GroupMap::iterator gs = messageGroups.begin();
-         gs != messageGroups.end(); ++gs) {
+         count && gs != messageGroups.end(); ++gs) {
 
         GroupState& state( gs->second );
         if (state.owner == name) {
-            state.owner.clear();
-            needReset = true;
+            --count;
+            disown(state);
             QPID_LOG( trace, "group queue " << queue->getName() <<
                       ": consumer name=" << name << " released group id=" <<
gs->first);
         }
     }
-
-    if (needReset) {
-        // KAG TODO: How do I invalidate all consumers that need invalidating????
-    }
+    consumers.erase( consumer );
     QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer
name=" << name );
 }
 
 
-bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
-                                       const Mutex::ScopedLock& )
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr c, QueuedMessage&
next,
+                                                 const Mutex::ScopedLock& )
 {
     Messages& messages(queue->getMessages());
 
     if (messages.empty())
         return false;
 
-    if (c->preAcquires()) {     // not browsing
-        next = messages.front();
-        do {
-            /** @todo KAG: horrifingly suboptimal  - optimize */
-            std::string group( getGroupId( next ) );
-            GroupMap::iterator gs = messageGroups.find( group );    /** @todo need to cache
this somehow */
-            assert( gs != messageGroups.end() );
-            GroupState& state( gs->second );
-            if (state.owner.empty()) {
-                state.owner = c->getName();
-                QPID_LOG( trace, "group queue " << queue->getName() <<
-                          ": consumer name=" << c->getName() << " has acquired
group id=" << group);
-                return true;
-            }
-            if (state.owner == c->getName()) {
-                return true;
-            }
-        } while (messages.next( next.position, next ));     /** @todo: .next() is a linear
search from front - optimize */
-        return false;
-    } else if (messages.next(c->position, next))
-        return true;
+    if (!freeGroups.empty()) {
+        framing::SequenceNumber nextFree = freeGroups.begin()->first;
+        if (nextFree < c->position) {  // next free group's msg is older than current
position
+            bool ok = messages.find(nextFree, next);
+            (void) ok; assert( ok );
+        } else {
+            if (!messages.next( c->position, next ))
+                return false;           // shouldn't happen - should find nextFree
+        }
+    } else {  // no free groups available
+        if (consumers[c->getName()] == 0) {  // and none currently owned
+            return false;       // so nothing available to consume
+        }
+        if (!messages.next( c->position, next ))
+            return false;
+    }
+
+    do {
+        // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+        std::string group( getGroupId( next ) );
+        GroupMap::iterator gs = messageGroups.find( group );
+        assert( gs != messageGroups.end() );
+        GroupState& state( gs->second );
+        if (!state.owned() || state.owner == c->getName()) {
+            return true;
+        }
+    } while (messages.next( next.position, next ));
     return false;
 }
 
 
-bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage&
qm,
-                                     const Mutex::ScopedLock&)
+bool MessageGroupManager::acquireMessage(const std::string& consumer, const QueuedMessage&
qm,
+                                         const Mutex::ScopedLock&)
 {
+    // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
     std::string group( getGroupId(qm) );
     GroupMap::iterator gs = messageGroups.find( group );
     assert( gs != messageGroups.end() );
     GroupState& state( gs->second );
 
-    if (state.owner.empty()) {
-        state.owner = consumer;
+    if (!state.owned()) {
+        own( state, consumer );
         QPID_LOG( trace, "group queue " << queue->getName() <<
                   ": consumer name=" << consumer << " has acquired group id="
<< gs->first);
         return true;
@@ -1722,7 +1827,7 @@ void MessageGroupManager::query(qpid::ty
          g != messageGroups.end(); ++g) {
         qpid::types::Variant::Map info;
         info[GroupIdKey] = g->first;
-        info[GroupMsgCount] = g->second.total;
+        info[GroupMsgCount] = g->second.members.size();
         info[GroupTimestamp] = 0;   /** @todo KAG - NEED HEAD MSG TIMESTAMP */
         info[GroupConsumer] = g->second.owner;
         groups.push_back(info);
@@ -1761,33 +1866,5 @@ boost::shared_ptr<MessageGroupManager> M
 
 
 
-// default allocator - requires messageLock to be held by caller!
-bool MessageAllocator::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
-                                   const Mutex::ScopedLock& /*just to enforce locking*/)
-{
-    Messages& messages(queue->getMessages());
-
-    if (messages.empty())
-        return false;
-
-    if (c->preAcquires()) {     // not browsing
-        next = messages.front();
-        return true;
-    } else if (messages.next(c->position, next))
-        return true;
-    return false;
-}
-
-
-// default allocator - requires messageLock to be held by caller!
-bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&,
-                                  const Mutex::ScopedLock& /*just to enforce locking*/)
-{
-    return true;    // always give permission to acquire
-}
-
-
-
-
 
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp?rev=1159671&r1=1159670&r2=1159671&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp Fri Aug 19 15:56:32 2011
@@ -705,23 +705,235 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
 }
 
+
+namespace {
+    // helper for group tests
+    void verifyAcquire( Queue::shared_ptr queue,
+                        TestConsumer::shared_ptr c,
+                        std::deque<QueuedMessage>& results,
+                        const std::string& expectedGroup,
+                        const int expectedId )
+    {
+        queue->dispatch(c);
+        results.push_back(c->last);
+        std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+        int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+        BOOST_CHECK_EQUAL( group, expectedGroup );
+        BOOST_CHECK_EQUAL( id, expectedId );
+    }
+}
+
 QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+    //
+    // Verify that consumers of grouped messages own the groups once a message is acquired,
+    // and release the groups once all acquired messages have been dequeued or requeued
+    //
     FieldTable args;
     Queue::shared_ptr queue(new Queue("my_queue", true));
     args.setString("qpid.group_header_key", "GROUP-ID");
     queue->configure(args);
 
-    for (int i = 0; i < 3; ++i) {
-        intrusive_ptr<Message> msg1 = create_message("e", "A");
-        msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","a");
-        queue->deliver(msg1);
+    std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+                             std::string("b"), std::string("b"), std::string("b"),
+                             std::string("c"), std::string("c"), std::string("c") };
+    for (int i = 0; i < 9; ++i) {
+        intrusive_ptr<Message> msg = create_message("e", "A");
+        msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID",
groups[i]);
+        msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID",
i);
+        queue->deliver(msg);
+    }
+
+    // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+    BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
+
+    TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+    TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+    queue->consume(c1);
+    queue->consume(c2);
+
+    std::deque<QueuedMessage> dequeMeC1;
+    std::deque<QueuedMessage> dequeMeC2;
+
+
+    verifyAcquire(queue, c1, dequeMeC1, "a", 0 );  // c1 now owns group "a" (acquire a-0)
+    verifyAcquire(queue, c2, dequeMeC2, "b", 3 );  // c2 should now own group "b" (acquire
b-3)
+
+    // now let c1 complete the 'a-0' message - this should free the 'a' group
+    queue->dequeue( 0, dequeMeC1.front() );
+    dequeMeC1.pop_front();
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+    // now c2 should pick up the next 'a-1', since it is oldest free
+    verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+    // c1 should only be able to snarf up the first "c" message now...
+    verifyAcquire(queue, c1, dequeMeC1, "c", 6 );    // should skip to the first "c"
+
+    // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+    // hmmm... what if c2 now dequeues "b-3"?  (now only has a-1 acquired)
+    queue->dequeue( 0, dequeMeC2.front() );
+    dequeMeC2.pop_front();
+
+    // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+    // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+    verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
+
+    // c2 can now only grab a-2, and that's all
+    verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
+
+    // now C2 can't get any more, since C1 owns "b" and "c" group...
+    bool gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    // hmmm... what if c1 now dequeues "c-6"?  (now only own's b-4)
+    queue->dequeue( 0, dequeMeC1.front() );
+    dequeMeC1.pop_front();
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+    // c2 can now grab c-7
+    verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+    // what happens if C-2 "requeues" a-1 and a-2?
+    queue->requeue( dequeMeC2.front() );
+    dequeMeC2.pop_front();
+    queue->requeue( dequeMeC2.front() );
+    dequeMeC2.pop_front();  // now just has c-7 acquired
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+    // now c1 will grab a-1 and a-2...
+    verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+    verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+    // c2 can now acquire c-8 only
+    verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
 
-        intrusive_ptr<Message> msg2 = create_message("e", "A");
-        msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","b");
-        queue->deliver(msg2);
+    // and c1 can get b-5
+    verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+    // should be no more acquire-able for anyone now:
+    gotOne = queue->dispatch(c1);
+    BOOST_CHECK( !gotOne );
+    gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    // requeue all of C1's acquired messages, then cancel C1
+    while (!dequeMeC1.empty()) {
+        queue->requeue(dequeMeC1.front());
+        dequeMeC1.pop_front();
+    }
+    queue->cancel(c1);
+
+    // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+    // Owners= ---, ---, ---, ---, ^C2, ^C2
+
+    // b-4, a-1, a-2, b-5 all should be available, right?
+    verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
+
+    while (!dequeMeC2.empty()) {
+        queue->dequeue(0, dequeMeC2.front());
+        dequeMeC2.pop_front();
+    }
+
+    // Queue = a-2, b-4, b-5
+    // Owners= ---, ---, ---
+
+    TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+    std::deque<QueuedMessage> dequeMeC3;
+
+    verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+    verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+    // Queue = a-2, b-4, b-5
+    // Owners= ^C3, ^C2, ^C2
+
+    gotOne = queue->dispatch(c3);
+    BOOST_CHECK( !gotOne );
+
+    verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+    while (!dequeMeC2.empty()) {
+        queue->dequeue(0, dequeMeC2.front());
+        dequeMeC2.pop_front();
     }
 
-    BOOST_CHECK_EQUAL(uint32_t(6), queue->getMessageCount());
+    // Queue = a-2,
+    // Owners= ^C3,
+
+    intrusive_ptr<Message> msg = create_message("e", "A");
+    msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID",
"a");
+    msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID",
9);
+    queue->deliver(msg);
+
+    // Queue = a-2, a-9
+    // Owners= ^C3, ^C3
+
+    gotOne = queue->dispatch(c2);
+    BOOST_CHECK( !gotOne );
+
+    msg = create_message("e", "A");
+    msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID",
"b");
+    msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID",
10);
+    queue->deliver(msg);
+
+    // Queue = a-2, a-9, b-10
+    // Owners= ^C3, ^C3, ----
+
+    verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+    verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+    gotOne = queue->dispatch(c3);
+    BOOST_CHECK( !gotOne );
+
+    queue->cancel(c2);
+    queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+    //
+    // Verify that the same default group name is automatically applied to messages that
+    // do not specify a group name.
+    //
+    FieldTable args;
+    Queue::shared_ptr queue(new Queue("my_queue", true));
+    args.setString("qpid.group_header_key", "GROUP-ID");
+    queue->configure(args);
+
+    for (int i = 0; i < 3; ++i) {
+        intrusive_ptr<Message> msg = create_message("e", "A");
+        // no "GROUP-ID" header
+        msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID",
i);
+        queue->deliver(msg);
+    }
+
+    // Queue = 0, 1, 2
+
+    BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
 
     TestConsumer::shared_ptr c1(new TestConsumer("C1"));
     TestConsumer::shared_ptr c2(new TestConsumer("C2"));
@@ -729,38 +941,37 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     queue->consume(c1);
     queue->consume(c2);
 
-    std::deque<QueuedMessage> dequeMe;
+    std::deque<QueuedMessage> dequeMeC1;
+    std::deque<QueuedMessage> dequeMeC2;
+
+    queue->dispatch(c1);    // c1 now owns default group (acquired 0)
+    dequeMeC1.push_back(c1->last);
+    int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 0 );
+
+    bool gotOne = queue->dispatch(c2);  // c2 should get nothing
+    BOOST_CHECK( !gotOne );
+
+    queue->dispatch(c1);    // c1 now acquires 1
+    dequeMeC1.push_back(c1->last);
+    id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 1 );
+
+    gotOne = queue->dispatch(c2);  // c2 should still get nothing
+    BOOST_CHECK( !gotOne );
+
+    while (!dequeMeC1.empty()) {
+        queue->dequeue(0, dequeMeC1.front());
+        dequeMeC1.pop_front();
+    }
+
+    // now default group should be available...
+    queue->dispatch(c2);    // c2 now owns default group (acquired 2)
+    id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+    BOOST_CHECK_EQUAL( id, 2 );
 
-    queue->dispatch(c1);    // now owns group "a"
-    dequeMe.push_back(c1->last);
-    queue->dispatch(c2);    // now owns group "b"
-    dequeMe.push_back(c2->last);
-
-    queue->dispatch(c2);    // should skip next "a", get last "b"
-    std::string group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
-    dequeMe.push_back(c2->last);
-    BOOST_CHECK_EQUAL( group, std::string("b") );
-
-    queue->dispatch(c1);    // should get last "a"
-    group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
-    dequeMe.push_back(c1->last);
-    BOOST_CHECK_EQUAL( group, std::string("a") );
-
-    // now "free up" the groups
-    while (!dequeMe.empty()) {
-        queue->dequeue( 0, dequeMe.front() );
-        dequeMe.pop_front();
-    }
-
-    // now c2 should be able to acquire group "a", and c1 group "b"
-
-    queue->dispatch(c2);
-    group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
-    BOOST_CHECK_EQUAL( group, std::string("a") );
-
-    queue->dispatch(c1);
-    group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
-    BOOST_CHECK_EQUAL( group, std::string("b") );
+    gotOne = queue->dispatch(c1);  // c1 should get nothing
+    BOOST_CHECK( !gotOne );
 
     queue->cancel(c1);
     queue->cancel(c2);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message