qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1197661 - in /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker: MessageGroupManager.cpp MessageGroupManager.h
Date Fri, 04 Nov 2011 16:56:47 GMT
Author: kgiusti
Date: Fri Nov  4 16:56:47 2011
New Revision: 1197661

URL: http://svn.apache.org/viewvc?rev=1197661&view=rev
Log:
QPID-3346: sync this branch to trunk msg group code.

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1197661&r1=1197660&r2=1197661&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Fri Nov  4 16:56:47
2011
@@ -43,13 +43,61 @@ const std::string MessageGroupManager::q
 const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 
 
-const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+void MessageGroupManager::unFree( const GroupState& state )
 {
+    GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+    assert( pos != freeGroups.end() && pos->second == &state );
+    freeGroups.erase( pos );
+}
+
+void MessageGroupManager::own( GroupState& state, const std::string& owner )
+{
+    state.owner = owner;
+    unFree( state );
+}
+
+void MessageGroupManager::disown( GroupState& state )
+{
+    state.owner.clear();
+    assert(state.members.size());
+    assert(freeGroups.find(state.members.front()) == freeGroups.end());
+    freeGroups[state.members.front()] = &state;
+}
+
+MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage&
qm )
+{
+    uint32_t thisMsg = qm.position.getValue();
+    if (cachedGroup && lastMsg == thisMsg) {
+        hits++;
+        return *cachedGroup;
+    }
+
+    std::string group = defaultGroupId;
     const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
-    if (!headers) return defaultGroupId;
-    qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
-    if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
-    return id->get<std::string>();
+    if (headers) {
+        qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+        if (id && id->convertsTo<std::string>()) {
+            std::string tmp = id->get<std::string>();
+            if (!tmp.empty())   // empty group is reserved
+                group = tmp;
+        }
+    }
+
+    if (cachedGroup && group == lastGroup) {
+        hits++;
+        lastMsg = thisMsg;
+        return *cachedGroup;
+    }
+
+    misses++;
+
+    GroupState& found = messageGroups[group];
+    if (found.group.empty())
+        found.group = group;    // new group, assign name
+    lastMsg = thisMsg;
+    lastGroup = group;
+    cachedGroup = &found;
+    return found;
 }
 
 
@@ -57,15 +105,13 @@ void MessageGroupManager::enqueued( cons
 {
     // @todo KAG optimization - store reference to group state in QueuedMessage
     // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupState &state(messageGroups[group]);
+    GroupState& state = findGroup(qm);
     state.members.push_back(qm.position);
     uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << qName <<
-              ": added message to group id=" << group << " total=" << total
);
+              ": added message to group id=" << state.group << " total=" <<
total );
     if (total == 1) {
         // newly created group, no owner
-        state.group = group;
         assert(freeGroups.find(qm.position) == freeGroups.end());
         freeGroups[qm.position] = &state;
     }
@@ -76,13 +122,11 @@ void MessageGroupManager::acquired( cons
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
+    GroupState& state = findGroup(qm);
+    assert(state.members.size());   // there are msgs present
     state.acquired += 1;
     QPID_LOG( trace, "group queue " << qName <<
-              ": acquired message in group id=" << group << " acquired=" <<
state.acquired );
+              ": acquired message in group id=" << state.group << " acquired="
<< state.acquired );
 }
 
 
@@ -90,19 +134,16 @@ void MessageGroupManager::requeued( cons
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
+    GroupState& state = findGroup(qm);
     assert( state.acquired != 0 );
     state.acquired -= 1;
     if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << state.owner << " released group id="
<< gs->first);
+                  ": consumer name=" << state.owner << " released group id="
<< state.group);
         disown(state);
     }
     QPID_LOG( trace, "group queue " << qName <<
-              ": requeued message to group id=" << group << " acquired=" <<
state.acquired );
+              ": requeued message to group id=" << state.group << " acquired="
<< state.acquired );
 }
 
 
@@ -110,10 +151,7 @@ void MessageGroupManager::dequeued( cons
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
+    GroupState& state = findGroup(qm);
     assert( state.members.size() != 0 );
     assert( state.acquired != 0 );
     state.acquired -= 1;
@@ -141,99 +179,55 @@ void MessageGroupManager::dequeued( cons
     }
 
     uint32_t total = state.members.size();
+    QPID_LOG( trace, "group queue " << qName <<
+              ": dequeued message from group id=" << state.group << " total="
<< total );
+
     if (total == 0) {
-        QPID_LOG( trace, "group queue " << qName << ": deleting group id=" <<
gs->first);
-        messageGroups.erase( gs );
+        QPID_LOG( trace, "group queue " << qName << ": deleting group id=" <<
state.group);
+        if (cachedGroup == &state) {
+            cachedGroup = 0;
+        }
+        std::string key(state.group);
+        messageGroups.erase( key );
     } else if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << state.owner << " released group id="
<< gs->first);
+                  ": consumer name=" << state.owner << " released group id="
<< state.group);
         disown(state);
     } else if (reFreeNeeded) {
         disown(state);
     }
-    QPID_LOG( trace, "group queue " << qName <<
-              ": dequeued message from group id=" << group << " total=" <<
total );
-}
-
-void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
-{
-#if 0
-    // allow a re-subscribing consumer
-    if (consumers.find(c.getName()) == consumers.end()) {
-        consumers[c.getName()] = 0;     // no groups owned yet
-        QPID_LOG( trace, "group queue " << qName << ": added consumer, name="
<< c.getName() );
-    } else {
-        QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed,
name=" << c.getName() );
-    }
-#endif
 }
 
-void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
+MessageGroupManager::~MessageGroupManager()
 {
-#if 0
-    const std::string& name(c.getName());
-    Consumers::iterator consumer = consumers.find(name);
-    assert(consumer != consumers.end());
-    size_t count = consumer->second;
-
-    for (GroupMap::iterator gs = messageGroups.begin();
-         count && gs != messageGroups.end(); ++gs) {
-
-        GroupState& state( gs->second );
-        if (state.owner == name) {
-            if (state.acquired == 0) {
-                --count;
-                disown(state);
-                QPID_LOG( trace, "group queue " << qName <<
-                          ": consumer name=" << name << " released group id="
<< gs->first);
-            }
-        }
-    }
-    if (count == 0) {
-        consumers.erase( consumer );
-        QPID_LOG( trace, "group queue " << qName << ": removed consumer name="
<< name );
-    } else {
-        // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
-        QPID_LOG( trace, "group queue " << qName << ": consumer name=" <<
name << " unsubscribed with outstanding messages.");
-    }
-#endif
+    QPID_LOG( debug, "group queue " << qName << " cache results: hits=" <<
hits << " misses=" << misses );
 }
-
-
 bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage&
next )
 {
     if (messages.empty())
         return false;
 
+    next.position = c->position;
     if (!freeGroups.empty()) {
-        framing::SequenceNumber nextFree = freeGroups.begin()->first;
-        if (nextFree < c->position) {  // next free group's msg is older than current
position
-            bool ok = messages.find(nextFree, next);
-            (void) ok; assert( ok );
-        } else {
-            if (!messages.next( c->position, next ))
-                return false;           // shouldn't happen - should find nextFree
-        }
-    } else {  // no free groups available
-#if 0
-        if (consumers[c->getName()] == 0) {  // and none currently owned
-            return false;       // so nothing available to consume
-        }
-#endif
-        if (!messages.next( c->position, next ))
-            return false;
-    }
-
-    do {
-        // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
-        std::string group( getGroupId( next ) );
-        GroupMap::iterator gs = messageGroups.find( group );
-        assert( gs != messageGroups.end() );
-        GroupState& state( gs->second );
-        if (!state.owned() || state.owner == c->getName()) {
+        const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
+        if (nextFree < next.position) {     // a free message is older than current
+            next.position = nextFree;
+            --next.position;
+        }
+    }
+
+    while (messages.next( next.position, next )) {
+        GroupState& group = findGroup(next);
+        if (!group.owned()) {
+            if (group.members.front() == next.position) {    // only take from head!
+                return true;
+            }
+            QPID_LOG(debug, "Skipping " << next.position << " since group " <<
group.group
+                     << "'s head message still pending. pos=" << group.members.front());
+        } else if (group.owner == c->getName()) {
             return true;
         }
-    } while (messages.next( next.position, next ));
+    }
     return false;
 }
 
@@ -241,15 +235,12 @@ bool MessageGroupManager::nextConsumable
 bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage&
qm)
 {
     // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
+    GroupState& state = findGroup(qm);
 
     if (!state.owned()) {
         own( state, consumer );
         QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << consumer << " has acquired group id="
<< gs->first);
+                  ": consumer name=" << consumer << " has acquired group id="
<< state.group);
         return true;
     }
     return state.owner == consumer;
@@ -389,8 +380,8 @@ void MessageGroupManager::setState(const
 {
     using namespace qpid::framing;
     messageGroups.clear();
-    //consumers.clear();
     freeGroups.clear();
+    cachedGroup = 0;
 
     framing::Array groupState(TYPE_CODE_MAP);
 
@@ -430,10 +421,7 @@ void MessageGroupManager::setState(const
         for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
             state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
         messageGroups[state.group] = state;
-        if (state.owned())
-            //consumers[state.owner]++;
-            ;
-        else {
+        if (!state.owned()) {
             assert(state.members.size());
             freeGroups[state.members.front()] = &messageGroups[state.group];
         }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1197661&r1=1197660&r2=1197661&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Fri Nov  4 16:56:47
2011
@@ -26,7 +26,7 @@
 
 #include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/MessageDistributor.h"
-
+#include "qpid/sys/unordered_map.h"
 
 namespace qpid {
 namespace broker {
@@ -44,22 +44,21 @@ class MessageGroupManager : public State
     const std::string qName;            // name of parent queue (for logs)
 
     struct GroupState {
+        // note: update getState()/setState() when changing this object's state implementation
         typedef std::deque<framing::SequenceNumber> PositionFifo;
 
         std::string group;  // group identifier
         std::string owner;  // consumer with outstanding acquired messages
         uint32_t acquired;  // count of outstanding acquired messages
-        //uint32_t total;     // count of enqueued messages in this group
         PositionFifo members;   // msgs belonging to this group
 
         GroupState() : acquired(0) {}
         bool owned() const {return !owner.empty();}
     };
-    typedef std::map<std::string, struct GroupState> GroupMap;
-    //typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
+
+    typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
     typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
 
-    // note: update getState()/setState() when changing this object's state implementation
     GroupMap messageGroups; // index: group name
     GroupFifo freeGroups;   // ordered by oldest free msg
     //Consumers consumers;    // index: consumer name
@@ -68,28 +67,15 @@ class MessageGroupManager : public State
     static const std::string qpidSharedGroup;   // if specified, one group can be consumed
by multiple receivers
     static const std::string qpidMessageGroupTimestamp;
 
-    const std::string getGroupId( const QueuedMessage& qm ) const;
-    void unFree( const GroupState& state )
-    {
-        GroupFifo::iterator pos = freeGroups.find( state.members.front() );
-        assert( pos != freeGroups.end() && pos->second == &state );
-        freeGroups.erase( pos );
-    }
-    void own( GroupState& state, const std::string& owner )
-    {
-        state.owner = owner;
-        //consumers[state.owner]++;
-        unFree( state );
-    }
-    void disown( GroupState& state )
-    {
-        //assert(consumers[state.owner]);
-        //consumers[state.owner]--;
-        state.owner.clear();
-        assert(state.members.size());
-        assert(freeGroups.find(state.members.front()) == freeGroups.end());
-        freeGroups[state.members.front()] = &state;
-    }
+    GroupState& findGroup( const QueuedMessage& qm );
+    unsigned long hits, misses; // for debug
+    uint32_t lastMsg;
+    std::string lastGroup;
+    GroupState *cachedGroup;
+
+    void unFree( const GroupState& state );
+    void own( GroupState& state, const std::string& owner );
+    void disown( GroupState& state );
 
  public:
 
@@ -101,13 +87,18 @@ class MessageGroupManager : public State
     MessageGroupManager(const std::string& header, const std::string& _qName,
                         Messages& container, unsigned int _timestamp=0 )
       : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
-      groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName)
{}
+      groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName),
+      hits(0), misses(0),
+      lastMsg(0), cachedGroup(0) {}
+    virtual ~MessageGroupManager();
+
+    // QueueObserver iface
     void enqueued( const QueuedMessage& qm );
     void acquired( const QueuedMessage& qm );
     void requeued( const QueuedMessage& qm );
     void dequeued( const QueuedMessage& qm );
-    void consumerAdded( const Consumer& );
-    void consumerRemoved( const Consumer& );
+    void consumerAdded( const Consumer& ) {};
+    void consumerRemoved( const Consumer& ) {};
     void getState(qpid::framing::FieldTable& state ) const;
     void setState(const qpid::framing::FieldTable&);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message