Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0D449E99 for ; Fri, 4 Nov 2011 16:57:13 +0000 (UTC) Received: (qmail 46023 invoked by uid 500); 4 Nov 2011 16:57:13 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 46004 invoked by uid 500); 4 Nov 2011 16:57:13 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 45985 invoked by uid 99); 4 Nov 2011 16:57:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2011 16:57:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2011 16:57:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E99292388860 for ; Fri, 4 Nov 2011 16:56:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1197661 - in /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker: MessageGroupManager.cpp MessageGroupManager.h Date: Fri, 04 Nov 2011 16:56:47 -0000 To: commits@qpid.apache.org From: kgiusti@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111104165647.E99292388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kgiusti Date: Fri Nov 4 16:56:47 2011 New Revision: 1197661 URL: http://svn.apache.org/viewvc?rev=1197661&view=rev Log: QPID-3346: sync this branch to trunk msg group code. Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1197661&r1=1197660&r2=1197661&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Fri Nov 4 16:56:47 2011 @@ -43,13 +43,61 @@ const std::string MessageGroupManager::q const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); -const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +void MessageGroupManager::unFree( const GroupState& state ) { + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); +} + +void MessageGroupManager::own( GroupState& state, const std::string& owner ) +{ + state.owner = owner; + unFree( state ); +} + +void MessageGroupManager::disown( GroupState& state ) +{ + state.owner.clear(); + assert(state.members.size()); + assert(freeGroups.find(state.members.front()) == freeGroups.end()); + freeGroups[state.members.front()] = &state; +} + +MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) +{ + uint32_t thisMsg = qm.position.getValue(); + if (cachedGroup && lastMsg == thisMsg) { + hits++; + return *cachedGroup; + } + + std::string group = defaultGroupId; const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return defaultGroupId; - qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); - if (!id || !id->convertsTo()) return defaultGroupId; - return id->get(); + if (headers) { + qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (id && id->convertsTo()) { + std::string tmp = id->get(); + if (!tmp.empty()) // empty group is reserved + group = tmp; + } + } + + if (cachedGroup && group == lastGroup) { + hits++; + lastMsg = thisMsg; + return *cachedGroup; + } + + misses++; + + GroupState& found = messageGroups[group]; + if (found.group.empty()) + found.group = group; // new group, assign name + lastMsg = thisMsg; + lastGroup = group; + cachedGroup = &found; + return found; } @@ -57,15 +105,13 @@ void MessageGroupManager::enqueued( cons { // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupState &state(messageGroups[group]); + GroupState& state = findGroup(qm); state.members.push_back(qm.position); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << - ": added message to group id=" << group << " total=" << total ); + ": added message to group id=" << state.group << " total=" << total ); if (total == 1) { // newly created group, no owner - state.group = group; assert(freeGroups.find(qm.position) == freeGroups.end()); freeGroups[qm.position] = &state; } @@ -76,13 +122,11 @@ void MessageGroupManager::acquired( cons { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); + assert(state.members.size()); // there are msgs present state.acquired += 1; QPID_LOG( trace, "group queue " << qName << - ": acquired message in group id=" << group << " acquired=" << state.acquired ); + ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); } @@ -90,19 +134,16 @@ void MessageGroupManager::requeued( cons { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); assert( state.acquired != 0 ); state.acquired -= 1; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); + ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); } QPID_LOG( trace, "group queue " << qName << - ": requeued message to group id=" << group << " acquired=" << state.acquired ); + ": requeued message to group id=" << state.group << " acquired=" << state.acquired ); } @@ -110,10 +151,7 @@ void MessageGroupManager::dequeued( cons { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); assert( state.members.size() != 0 ); assert( state.acquired != 0 ); state.acquired -= 1; @@ -141,99 +179,55 @@ void MessageGroupManager::dequeued( cons } uint32_t total = state.members.size(); + QPID_LOG( trace, "group queue " << qName << + ": dequeued message from group id=" << state.group << " total=" << total ); + if (total == 0) { - QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); - messageGroups.erase( gs ); + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group); + if (cachedGroup == &state) { + cachedGroup = 0; + } + std::string key(state.group); + messageGroups.erase( key ); } else if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); + ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); } else if (reFreeNeeded) { disown(state); } - QPID_LOG( trace, "group queue " << qName << - ": dequeued message from group id=" << group << " total=" << total ); -} - -void MessageGroupManager::consumerAdded( const Consumer& /*c*/ ) -{ -#if 0 - // allow a re-subscribing consumer - if (consumers.find(c.getName()) == consumers.end()) { - consumers[c.getName()] = 0; // no groups owned yet - QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); - } else { - QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() ); - } -#endif } -void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ ) +MessageGroupManager::~MessageGroupManager() { -#if 0 - const std::string& name(c.getName()); - Consumers::iterator consumer = consumers.find(name); - assert(consumer != consumers.end()); - size_t count = consumer->second; - - for (GroupMap::iterator gs = messageGroups.begin(); - count && gs != messageGroups.end(); ++gs) { - - GroupState& state( gs->second ); - if (state.owner == name) { - if (state.acquired == 0) { - --count; - disown(state); - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << name << " released group id=" << gs->first); - } - } - } - if (count == 0) { - consumers.erase( consumer ); - QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); - } else { - // don't release groups with outstanding acquired msgs - consumer may re-subscribe! - QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages."); - } -#endif + QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses ); } - - bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { if (messages.empty()) return false; + next.position = c->position; if (!freeGroups.empty()) { - framing::SequenceNumber nextFree = freeGroups.begin()->first; - if (nextFree < c->position) { // next free group's msg is older than current position - bool ok = messages.find(nextFree, next); - (void) ok; assert( ok ); - } else { - if (!messages.next( c->position, next )) - return false; // shouldn't happen - should find nextFree - } - } else { // no free groups available -#if 0 - if (consumers[c->getName()] == 0) { // and none currently owned - return false; // so nothing available to consume - } -#endif - if (!messages.next( c->position, next )) - return false; - } - - do { - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId( next ) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - if (!state.owned() || state.owner == c->getName()) { + const framing::SequenceNumber& nextFree = freeGroups.begin()->first; + if (nextFree < next.position) { // a free message is older than current + next.position = nextFree; + --next.position; + } + } + + while (messages.next( next.position, next )) { + GroupState& group = findGroup(next); + if (!group.owned()) { + if (group.members.front() == next.position) { // only take from head! + return true; + } + QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group + << "'s head message still pending. pos=" << group.members.front()); + } else if (group.owner == c->getName()) { return true; } - } while (messages.next( next.position, next )); + } return false; } @@ -241,15 +235,12 @@ bool MessageGroupManager::nextConsumable bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); if (!state.owned()) { own( state, consumer ); QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << consumer << " has acquired group id=" << gs->first); + ": consumer name=" << consumer << " has acquired group id=" << state.group); return true; } return state.owner == consumer; @@ -389,8 +380,8 @@ void MessageGroupManager::setState(const { using namespace qpid::framing; messageGroups.clear(); - //consumers.clear(); freeGroups.clear(); + cachedGroup = 0; framing::Array groupState(TYPE_CODE_MAP); @@ -430,10 +421,7 @@ void MessageGroupManager::setState(const for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) state.members.push_back((*p)->getIntegerValue()); messageGroups[state.group] = state; - if (state.owned()) - //consumers[state.owner]++; - ; - else { + if (!state.owned()) { assert(state.members.size()); freeGroups[state.members.front()] = &messageGroups[state.group]; } Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1197661&r1=1197660&r2=1197661&view=diff ============================================================================== --- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original) +++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Fri Nov 4 16:56:47 2011 @@ -26,7 +26,7 @@ #include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/MessageDistributor.h" - +#include "qpid/sys/unordered_map.h" namespace qpid { namespace broker { @@ -44,22 +44,21 @@ class MessageGroupManager : public State const std::string qName; // name of parent queue (for logs) struct GroupState { + // note: update getState()/setState() when changing this object's state implementation typedef std::deque PositionFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - //uint32_t total; // count of enqueued messages in this group PositionFifo members; // msgs belonging to this group GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} }; - typedef std::map GroupMap; - //typedef std::map Consumers; // count of owned groups + + typedef sys::unordered_map GroupMap; typedef std::map GroupFifo; - // note: update getState()/setState() when changing this object's state implementation GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg //Consumers consumers; // index: consumer name @@ -68,28 +67,15 @@ class MessageGroupManager : public State static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers static const std::string qpidMessageGroupTimestamp; - const std::string getGroupId( const QueuedMessage& qm ) const; - void unFree( const GroupState& state ) - { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); - assert( pos != freeGroups.end() && pos->second == &state ); - freeGroups.erase( pos ); - } - void own( GroupState& state, const std::string& owner ) - { - state.owner = owner; - //consumers[state.owner]++; - unFree( state ); - } - void disown( GroupState& state ) - { - //assert(consumers[state.owner]); - //consumers[state.owner]--; - state.owner.clear(); - assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; - } + GroupState& findGroup( const QueuedMessage& qm ); + unsigned long hits, misses; // for debug + uint32_t lastMsg; + std::string lastGroup; + GroupState *cachedGroup; + + void unFree( const GroupState& state ); + void own( GroupState& state, const std::string& owner ); + void disown( GroupState& state ); public: @@ -101,13 +87,18 @@ class MessageGroupManager : public State MessageGroupManager(const std::string& header, const std::string& _qName, Messages& container, unsigned int _timestamp=0 ) : StatefulQueueObserver(std::string("MessageGroupManager:") + header), - groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} + groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName), + hits(0), misses(0), + lastMsg(0), cachedGroup(0) {} + virtual ~MessageGroupManager(); + + // QueueObserver iface void enqueued( const QueuedMessage& qm ); void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ); - void consumerRemoved( const Consumer& ); + void consumerAdded( const Consumer& ) {}; + void consumerRemoved( const Consumer& ) {}; void getState(qpid::framing::FieldTable& state ) const; void setState(const qpid::framing::FieldTable&); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org