qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1166299 - in /qpid/branches/qpid-3346/qpid/cpp/src: Makefile.am qpid/broker/MessageAllocator.cpp qpid/broker/MessageAllocator.h qpid/broker/MessageGroupManager.cpp qpid/broker/MessageGroupManager.h qpid/broker/Queue.cpp
Date Wed, 07 Sep 2011 18:23:01 GMT
Author: kgiusti
Date: Wed Sep  7 18:23:01 2011
New Revision: 1166299

URL: http://svn.apache.org/viewvc?rev=1166299&view=rev
Log:
QPID-3346: move message group code into its own files.

Added:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp

Modified: qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am?rev=1166299&r1=1166298&r2=1166299&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/Makefile.am Wed Sep  7 18:23:01 2011
@@ -671,6 +671,10 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/TxPublish.h \
   qpid/broker/Vhost.cpp \
   qpid/broker/Vhost.h \
+  qpid/broker/MessageAllocator.h \
+  qpid/broker/MessageAllocator.cpp \
+  qpid/broker/MessageGroupManager.cpp \
+  qpid/broker/MessageGroupManager.h \
   qpid/management/ManagementAgent.cpp \
   qpid/management/ManagementAgent.h \
   qpid/management/ManagementDirectExchange.cpp \

Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp?rev=1166299&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp Wed Sep  7 18:23:01
2011
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Used by queues to allocate the next "most desirable" message to a consuming client */
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageAllocator.h"
+
+using namespace qpid::broker;
+
+bool MessageAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage&
next,
+                                              const sys::Mutex::ScopedLock&)
+{
+    Messages& messages(queue->getMessages());
+    if (!messages.empty()) {
+        next = messages.front();    // by default, consume oldest msg
+        return true;
+    }
+    return false;
+}
+
+bool MessageAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage&
next,
+                                             const sys::Mutex::ScopedLock&)
+{
+    Messages& messages(queue->getMessages());
+    if (!messages.empty() && messages.next(c->position, next))
+        return true;
+    return false;
+}
+
+
+bool MessageAllocator::acquirable( const std::string&,
+                                   const QueuedMessage&,
+                                   const sys::Mutex::ScopedLock&)
+{
+    return true;
+}
+
+void MessageAllocator::query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&)
const
+{
+}
+

Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h?rev=1166299&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.h Wed Sep  7 18:23:01
2011
@@ -0,0 +1,79 @@
+#ifndef _broker_MessageAllocator_h
+#define _broker_MessageAllocator_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Used by queues to allocate the next "most desirable" message to a consuming client */
+
+
+#include "qpid/broker/Consumer.h"
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+class QueuedMessage;
+
+class MessageAllocator
+{
+ protected:
+    Queue *queue;
+ public:
+    MessageAllocator( Queue *q ) : queue(q) {}
+    virtual ~MessageAllocator() {};
+
+    // Note: all methods taking a mutex assume the caller is holding the
+    // Queue::messageLock during the method call.
+
+    /** Determine the next message available for consumption by the consumer
+     * @param next set to the next message that the consumer may acquire.
+     * @return true if message is available
+     */
+    virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
+                                        QueuedMessage& next,
+                                        const sys::Mutex::ScopedLock& lock);
+
+    /** Determine the next message available for browsing by the consumer
+     * @param next set to the next message that the consumer may browse.
+     * @return true if a message is available
+     */
+    virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
+                                       QueuedMessage& next,
+                                       const sys::Mutex::ScopedLock& lock);
+
+    /** check if a message previously returned via next*Message() may be acquired.
+     * @param consumer name of consumer that is attempting to acquire the message
+     * @param qm the message to be acquired
+     * @param messageLock - ensures caller is holding it!
+     * @return true if acquire is permitted, false if acquire is no longer permitted.
+     */
+    virtual bool acquirable( const std::string&,
+                             const QueuedMessage&,
+                             const sys::Mutex::ScopedLock&);
+
+    /** hook to add any interesting management state to the status map */
+    virtual void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&)
const;
+};
+
+}}
+
+#endif

Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1166299&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Wed Sep  7 18:23:01
2011
@@ -0,0 +1,307 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageGroupManager.h"
+
+using namespace qpid::broker;
+
+namespace {
+    const std::string GroupQueryKey("qpid.message_group_queue");
+    const std::string GroupHeaderKey("group_header_key");
+    const std::string GroupStateKey("group_state");
+    const std::string GroupIdKey("group_id");
+    const std::string GroupMsgCount("msg_count");
+    const std::string GroupTimestamp("timestamp");
+    const std::string GroupConsumer("consumer");
+}
+
+
+const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group");     /**
@todo KAG: make configurable in Broker options */
+
+
+const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+{
+    const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+    if (!headers) return qpidMessageGroupDefault;
+    qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+    if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
+    return id->get<std::string>();
+}
+
+
+void MessageGroupManager::enqueued( const QueuedMessage& qm )
+{
+    // @todo KAG optimization - store reference to group state in QueuedMessage
+    // issue: const-ness??
+    std::string group( getGroupId(qm) );
+    GroupState &state(messageGroups[group]);
+    state.members.push_back(qm.position);
+    uint32_t total = state.members.size();
+    QPID_LOG( trace, "group queue " << queue->getName() <<
+              ": added message to group id=" << group << " total=" << total
);
+    if (total == 1) {
+        // newly created group, no owner
+        state.group = group;
+#ifdef NDEBUG
+        freeGroups[qm.position] = &state;
+#else
+        bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
+        (void) unique; assert(unique);
+#endif
+    }
+}
+
+
+void MessageGroupManager::acquired( const QueuedMessage& qm )
+{
+    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+    // issue: const-ness??
+    std::string group( getGroupId(qm) );
+    GroupMap::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+    state.acquired += 1;
+    QPID_LOG( trace, "group queue " << queue->getName() <<
+              ": acquired message in group id=" << group << " acquired=" <<
state.acquired );
+}
+
+
+void MessageGroupManager::requeued( const QueuedMessage& qm )
+{
+    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+    // issue: const-ness??
+    // @todo KAG BUG - how to ensure requeue happens in the correct order?
+    // @todo KAG BUG - if requeue is not in correct order - what do we do?  throw?
+    std::string group( getGroupId(qm) );
+    GroupMap::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+    assert( state.acquired != 0 );
+    state.acquired -= 1;
+    if (state.acquired == 0 && state.owned()) {
+        QPID_LOG( trace, "group queue " << queue->getName() <<
+                  ": consumer name=" << state.owner << " released group id="
<< gs->first);
+        disown(state);
+    }
+    QPID_LOG( trace, "group queue " << queue->getName() <<
+              ": requeued message to group id=" << group << " acquired=" <<
state.acquired );
+}
+
+
+void MessageGroupManager::dequeued( const QueuedMessage& qm )
+{
+    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+    // issue: const-ness??
+    std::string group( getGroupId(qm) );
+    GroupMap::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+    assert( state.members.size() != 0 );
+
+    // likely to be at or near begin() if dequeued in order
+    {
+        GroupState::PositionFifo::iterator pos = state.members.begin();
+        GroupState::PositionFifo::iterator end = state.members.end();
+        while (pos != end) {
+            if (*pos == qm.position) {
+                state.members.erase(pos);
+                break;
+            }
+            ++pos;
+        }
+    }
+
+    assert( state.acquired != 0 );
+    state.acquired -= 1;
+    uint32_t total = state.members.size();
+    if (total == 0) {
+        if (!state.owned()) {  // unlikely, but need to remove from the free list before
erase
+            unFree( state );
+        }
+        QPID_LOG( trace, "group queue " << queue->getName() << ": deleting
group id=" << gs->first);
+        messageGroups.erase( gs );
+    } else {
+        if (state.acquired == 0 && state.owned()) {
+            QPID_LOG( trace, "group queue " << queue->getName() <<
+                      ": consumer name=" << state.owner << " released group id="
<< gs->first);
+            disown(state);
+        }
+    }
+    QPID_LOG( trace, "group queue " << queue->getName() <<
+              ": dequeued message from group id=" << group << " total=" <<
total );
+}
+
+void MessageGroupManager::consumerAdded( const Consumer& c )
+{
+    assert(consumers.find(c.getName()) == consumers.end());
+    consumers[c.getName()] = 0;     // no groups owned yet
+    QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer,
name=" << c.getName() );
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c )
+{
+    const std::string& name(c.getName());
+    Consumers::iterator consumer = consumers.find(name);
+    assert(consumer != consumers.end());
+    size_t count = consumer->second;
+
+    for (GroupMap::iterator gs = messageGroups.begin();
+         count && gs != messageGroups.end(); ++gs) {
+
+        GroupState& state( gs->second );
+        if (state.owner == name) {
+            --count;
+            disown(state);
+            QPID_LOG( trace, "group queue " << queue->getName() <<
+                      ": consumer name=" << name << " released group id=" <<
gs->first);
+        }
+    }
+    consumers.erase( consumer );
+    QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer
name=" << name );
+}
+
+
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage&
next,
+                                                 const qpid::sys::Mutex::ScopedLock&
)
+{
+    Messages& messages(queue->getMessages());
+
+    if (messages.empty())
+        return false;
+
+    if (!freeGroups.empty()) {
+        framing::SequenceNumber nextFree = freeGroups.begin()->first;
+        if (nextFree < c->position) {  // next free group's msg is older than current
position
+            bool ok = messages.find(nextFree, next);
+            (void) ok; assert( ok );
+        } else {
+            if (!messages.next( c->position, next ))
+                return false;           // shouldn't happen - should find nextFree
+        }
+    } else {  // no free groups available
+        if (consumers[c->getName()] == 0) {  // and none currently owned
+            return false;       // so nothing available to consume
+        }
+        if (!messages.next( c->position, next ))
+            return false;
+    }
+
+    do {
+        // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
+        std::string group( getGroupId( next ) );
+        GroupMap::iterator gs = messageGroups.find( group );
+        assert( gs != messageGroups.end() );
+        GroupState& state( gs->second );
+        if (!state.owned() || state.owner == c->getName()) {
+            return true;
+        }
+    } while (messages.next( next.position, next ));
+    return false;
+}
+
+
+bool MessageGroupManager::acquirable(const std::string& consumer, const QueuedMessage&
qm,
+                                     const qpid::sys::Mutex::ScopedLock&)
+{
+    // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+    std::string group( getGroupId(qm) );
+    GroupMap::iterator gs = messageGroups.find( group );
+    assert( gs != messageGroups.end() );
+    GroupState& state( gs->second );
+
+    if (!state.owned()) {
+        own( state, consumer );
+        QPID_LOG( trace, "group queue " << queue->getName() <<
+                  ": consumer name=" << consumer << " has acquired group id="
<< gs->first);
+        return true;
+    }
+    return state.owner == consumer;
+}
+
+
+void MessageGroupManager::query(qpid::types::Variant::Map& status,
+                                const qpid::sys::Mutex::ScopedLock&) const
+{
+    /** Add a description of the current state of the message groups for this queue.
+        FORMAT:
+        { "qpid.message_group_queue":
+            { "group_header_key" : "<KEY>",
+              "group_state" :
+                   [ { "group_id"  : "<name>",
+                       "msg_count" : <int>,
+                       "timestamp" : <absTime>,
+                       "consumer"  : <consumer name> },
+                     {...} // one for each known group
+                   ]
+            }
+        }
+    **/
+
+    assert(status.find(GroupQueryKey) == status.end());
+    qpid::types::Variant::Map state;
+    qpid::types::Variant::List groups;
+
+    state[GroupHeaderKey] = groupIdHeader;
+    for (GroupMap::const_iterator g = messageGroups.begin();
+         g != messageGroups.end(); ++g) {
+        qpid::types::Variant::Map info;
+        info[GroupIdKey] = g->first;
+        info[GroupMsgCount] = g->second.members.size();
+        info[GroupTimestamp] = 0;   /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+        info[GroupConsumer] = g->second.owner;
+        groups.push_back(info);
+    }
+    state[GroupStateKey] = groups;
+    status[GroupQueryKey] = state;
+}
+
+
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
+                                                                    const qpid::framing::FieldTable&
settings )
+{
+    boost::shared_ptr<MessageGroupManager> empty;
+
+    if (settings.isSet(qpidMessageGroupKey)) {
+
+        std::string headerKey = settings.getAsString(qpidMessageGroupKey);
+        if (headerKey.empty()) {
+            QPID_LOG( error, "A Message Group header key must be configured, queue=" <<
q->getName());
+            return empty;
+        }
+        unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
+
+        boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey,
q, timestamp ) );
+
+        q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
+
+        QPID_LOG( debug, "Configured Queue '" << q->getName() <<
+                  "' for message grouping using header key '" << headerKey <<
"'" <<
+                  " (timestamp=" << timestamp << ")");
+        return manager;
+    }
+    return empty;
+}

Added: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1166299&view=auto
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (added)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Wed Sep  7 18:23:01
2011
@@ -0,0 +1,116 @@
+#ifndef _broker_MessageGroupManager_h
+#define _broker_MessageGroupManager_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* for managing message grouping on Queues */
+
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/MessageAllocator.h"
+
+
+namespace qpid {
+namespace broker {
+
+class QueueObserver;
+class MessageAllocator;
+
+class MessageGroupManager : public QueueObserver, public MessageAllocator
+{
+    const std::string groupIdHeader;    // msg header holding group identifier
+    const unsigned int timestamp;       // mark messages with timestamp if set
+
+    struct GroupState {
+        typedef std::list<framing::SequenceNumber> PositionFifo;
+
+        std::string group;  // group identifier
+        std::string owner;  // consumer with outstanding acquired messages
+        uint32_t acquired;  // count of outstanding acquired messages
+        //uint32_t total;     // count of enqueued messages in this group
+        PositionFifo members;   // msgs belonging to this group
+
+        GroupState() : acquired(0) {}
+        bool owned() const {return !owner.empty();}
+    };
+    typedef std::map<std::string, struct GroupState> GroupMap;
+    typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
+    typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+
+    GroupMap messageGroups; // index: group name
+    GroupFifo freeGroups; // ordered by oldest free msg
+    Consumers consumers;    // index: consumer name
+
+    static const std::string qpidMessageGroupKey;
+    static const std::string qpidMessageGroupTimestamp;
+    static const std::string qpidMessageGroupDefault;
+
+    const std::string getGroupId( const QueuedMessage& qm ) const;
+    void unFree( const GroupState& state )
+    {
+        GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+        assert( pos != freeGroups.end() && pos->second == &state );
+        freeGroups.erase( pos );
+    }
+    void own( GroupState& state, const std::string& owner )
+    {
+        state.owner = owner;
+        consumers[state.owner]++;
+        unFree( state );
+    }
+    void disown( GroupState& state )
+    {
+        assert(consumers[state.owner]);
+        consumers[state.owner]--;
+        state.owner.clear();
+        assert(state.members.size());
+#ifdef NDEBUG
+        freeGroups[state.members.front()] = &state;
+#else
+        bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
+        (void) unique; assert(unique);
+#endif
+    }
+
+ public:
+
+    static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable&
settings );
+
+    MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0
)
+        : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp)
{}
+    void enqueued( const QueuedMessage& qm );
+    void acquired( const QueuedMessage& qm );
+    void requeued( const QueuedMessage& qm );
+    void dequeued( const QueuedMessage& qm );
+    void consumerAdded( const Consumer& );
+    void consumerRemoved( const Consumer& );
+    bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
+                                const sys::Mutex::ScopedLock&);
+    // uses default nextBrowsableMessage()
+    bool acquirable(const std::string& consumer, const QueuedMessage& msg,
+                    const sys::Mutex::ScopedLock&);
+    void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const;
+    bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+};
+
+}}
+
+#endif

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1166299&r1=1166298&r2=1166299&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Wed Sep  7 18:23:01 2011
@@ -33,6 +33,8 @@
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/MessageAllocator.h"
+#include "qpid/broker/MessageGroupManager.h"
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
@@ -88,152 +90,6 @@ const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
 }
 
-
-// KAG TBD: find me a home....
-namespace qpid {
-namespace broker {
-
-class MessageAllocator
-{
- protected:
-    Queue *queue;
- public:
-    MessageAllocator( Queue *q ) : queue(q) {}
-    virtual ~MessageAllocator() {};
-
-    // Note: all methods taking a mutex assume the caller is holding the
-    // Queue::messageLock during the method call.
-
-    /** Determine the next message available for consumption by the consumer
-     * @param next set to the next message that the consumer may acquire.
-     * @return true if message is available
-     */
-    virtual bool nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next,
-                                        const Mutex::ScopedLock&)
-    {
-        Messages& messages(queue->getMessages());
-        if (!messages.empty()) {
-            next = messages.front();    // by default, consume oldest msg
-            return true;
-        }
-        return false;
-    }
-    /** Determine the next message available for browsing by the consumer
-     * @param next set to the next message that the consumer may browse.
-     * @return true if a message is available
-     */
-    virtual bool nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
-                                       const Mutex::ScopedLock&)
-    {
-        Messages& messages(queue->getMessages());
-        if (!messages.empty() && messages.next(c->position, next))
-            return true;
-        return false;
-    }
-    /** acquire a message previously returned via next*Message().
-     * @param consumer name of consumer that is attempting to acquire the message
-     * @param qm the message to be acquired
-     * @param messageLock - ensures caller is holding it!
-     * @returns true if acquire is successful, false if acquire failed.
-     */
-    virtual bool acquireMessage( const std::string&, const QueuedMessage&,
-                                 const Mutex::ScopedLock&)
-    {
-        return true;
-    }
-
-    /** hook to add any interesting management state to the status map */
-    virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const
{};
-};
-
-
-
-class MessageGroupManager : public QueueObserver, public MessageAllocator
-{
-    const std::string groupIdHeader;    // msg header holding group identifier
-    const unsigned int timestamp;       // mark messages with timestamp if set
-
-    struct GroupState {
-        typedef std::list<framing::SequenceNumber> PositionFifo;
-
-        std::string group;  // group identifier
-        std::string owner;  // consumer with outstanding acquired messages
-        uint32_t acquired;  // count of outstanding acquired messages
-        //uint32_t total;     // count of enqueued messages in this group
-        PositionFifo members;   // msgs belonging to this group
-
-        GroupState() : acquired(0) {}
-        bool owned() const {return !owner.empty();}
-    };
-    typedef std::map<std::string, struct GroupState> GroupMap;
-    typedef std::map<std::string, uint32_t> Consumers;  // count of owned groups
-    typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
-
-    GroupMap messageGroups; // index: group name
-    GroupFifo freeGroups; // ordered by oldest free msg
-    Consumers consumers;    // index: consumer name
-
-    static const std::string qpidMessageGroupKey;
-    static const std::string qpidMessageGroupTimestamp;
-    static const std::string qpidMessageGroupDefault;
-
-    const std::string getGroupId( const QueuedMessage& qm ) const;
-    void unFree( const GroupState& state )
-    {
-        GroupFifo::iterator pos = freeGroups.find( state.members.front() );
-        assert( pos != freeGroups.end() && pos->second == &state );
-        freeGroups.erase( pos );
-    }
-    void own( GroupState& state, const std::string& owner )
-    {
-        state.owner = owner;
-        consumers[state.owner]++;
-        unFree( state );
-    }
-    void disown( GroupState& state )
-    {
-        assert(consumers[state.owner]);
-        consumers[state.owner]--;
-        state.owner.clear();
-        assert(state.members.size());
-#ifdef NDEBUG
-        freeGroups[state.members.front()] = &state;
-#else
-        bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
-        (void) unique; assert(unique);
-#endif
-    }
-
- public:
-
-    static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable&
settings );
-
-    MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0
)
-        : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp)
{}
-    void enqueued( const QueuedMessage& qm );
-    void acquired( const QueuedMessage& qm );
-    void requeued( const QueuedMessage& qm );
-    void dequeued( const QueuedMessage& qm );
-    void consumerAdded( const Consumer& );
-    void consumerRemoved( const Consumer& );
-    bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
-                                const Mutex::ScopedLock&);
-    // uses default nextBrowsableMessage()
-    bool acquireMessage(const std::string& consumer, const QueuedMessage& msg,
-                        const Mutex::ScopedLock&);
-    void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const;
-    bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
-};
-
-const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
-const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group");     /**
@todo KAG: make configurable in Broker options */
-
-}}
-// KAG TBD: END find me a home....
-
-
-
 Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
              const OwnershipToken* const _owner,
@@ -400,7 +256,7 @@ bool Queue::acquire(const QueuedMessage&
     assertClusterSafe();
     QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
 
-    if (!allocator->acquireMessage( consumer, msg, locker )) {
+    if (!allocator->acquirable( consumer, msg, locker )) {
         QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position <<
" from '" << name);
         return false;
     }
@@ -447,7 +303,6 @@ bool Queue::getNextMessage(QueuedMessage
 
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr&
c)
 {
-
     while (true) {
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
@@ -471,7 +326,7 @@ Queue::ConsumeCode Queue::consumeNextMes
 
         if (c->filter(msg.payload)) {
             if (c->accept(msg.payload)) {
-                bool ok = allocator->acquireMessage( c->getName(), msg, locker ); 
// inform allocator
+                bool ok = allocator->acquirable( c->getName(), msg, locker );  // inform
allocator
                 (void) ok; assert(ok);
                 ok = acquire( msg.position, msg );
                 (void) ok; assert(ok);
@@ -1071,6 +926,7 @@ void Queue::create(const FieldTable& _se
     configureImpl(_settings);
 }
 
+
 int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string&
key)
 {
     qpid::framing::FieldTable::ValuePtr v = settings.get(key);
@@ -1584,287 +1440,3 @@ void Queue::UsageBarrier::destroy()
     parent.deleted = true;
     while (count) parent.messageLock.wait();
 }
-
-
-// KAG TBD: flesh out...
-
-
-
-
-const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
-{
-    const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
-    if (!headers) return qpidMessageGroupDefault;
-    FieldTable::ValuePtr id = headers->get( groupIdHeader );
-    if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
-    return id->get<std::string>();
-}
-
-
-void MessageGroupManager::enqueued( const QueuedMessage& qm )
-{
-    // @todo KAG optimization - store reference to group state in QueuedMessage
-    // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupState &state(messageGroups[group]);
-    state.members.push_back(qm.position);
-    uint32_t total = state.members.size();
-    QPID_LOG( trace, "group queue " << queue->getName() <<
-              ": added message to group id=" << group << " total=" << total
);
-    if (total == 1) {
-        // newly created group, no owner
-        state.group = group;
-#ifdef NDEBUG
-        freeGroups[qm.position] = &state;
-#else
-        bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
-        (void) unique; assert(unique);
-#endif
-    }
-}
-
-
-void MessageGroupManager::acquired( const QueuedMessage& qm )
-{
-    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
-    // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
-    state.acquired += 1;
-    QPID_LOG( trace, "group queue " << queue->getName() <<
-              ": acquired message in group id=" << group << " acquired=" <<
state.acquired );
-}
-
-
-void MessageGroupManager::requeued( const QueuedMessage& qm )
-{
-    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
-    // issue: const-ness??
-    // @todo KAG BUG - how to ensure requeue happens in the correct order?
-    // @todo KAG BUG - if requeue is not in correct order - what do we do?  throw?
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
-    if (state.acquired == 0 && state.owned()) {
-        QPID_LOG( trace, "group queue " << queue->getName() <<
-                  ": consumer name=" << state.owner << " released group id="
<< gs->first);
-        disown(state);
-    }
-    QPID_LOG( trace, "group queue " << queue->getName() <<
-              ": requeued message to group id=" << group << " acquired=" <<
state.acquired );
-}
-
-
-void MessageGroupManager::dequeued( const QueuedMessage& qm )
-{
-    // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
-    // issue: const-ness??
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
-    assert( state.members.size() != 0 );
-
-    // likely to be at or near begin() if dequeued in order
-    {
-        GroupState::PositionFifo::iterator pos = state.members.begin();
-        GroupState::PositionFifo::iterator end = state.members.end();
-        while (pos != end) {
-            if (*pos == qm.position) {
-                state.members.erase(pos);
-                break;
-            }
-            ++pos;
-        }
-    }
-
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
-    uint32_t total = state.members.size();
-    if (total == 0) {
-        if (!state.owned()) {  // unlikely, but need to remove from the free list before
erase
-            unFree( state );
-        }
-        QPID_LOG( trace, "group queue " << queue->getName() << ": deleting
group id=" << gs->first);
-        messageGroups.erase( gs );
-    } else {
-        if (state.acquired == 0 && state.owned()) {
-            QPID_LOG( trace, "group queue " << queue->getName() <<
-                      ": consumer name=" << state.owner << " released group id="
<< gs->first);
-            disown(state);
-        }
-    }
-    QPID_LOG( trace, "group queue " << queue->getName() <<
-              ": dequeued message from group id=" << group << " total=" <<
total );
-}
-
-void MessageGroupManager::consumerAdded( const Consumer& c )
-{
-    assert(consumers.find(c.getName()) == consumers.end());
-    consumers[c.getName()] = 0;     // no groups owned yet
-    QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer,
name=" << c.getName() );
-}
-
-void MessageGroupManager::consumerRemoved( const Consumer& c )
-{
-    const std::string& name(c.getName());
-    Consumers::iterator consumer = consumers.find(name);
-    assert(consumer != consumers.end());
-    size_t count = consumer->second;
-
-    for (GroupMap::iterator gs = messageGroups.begin();
-         count && gs != messageGroups.end(); ++gs) {
-
-        GroupState& state( gs->second );
-        if (state.owner == name) {
-            --count;
-            disown(state);
-            QPID_LOG( trace, "group queue " << queue->getName() <<
-                      ": consumer name=" << name << " released group id=" <<
gs->first);
-        }
-    }
-    consumers.erase( consumer );
-    QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer
name=" << name );
-}
-
-
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage&
next,
-                                                 const Mutex::ScopedLock& )
-{
-    Messages& messages(queue->getMessages());
-
-    if (messages.empty())
-        return false;
-
-    if (!freeGroups.empty()) {
-        framing::SequenceNumber nextFree = freeGroups.begin()->first;
-        if (nextFree < c->position) {  // next free group's msg is older than current
position
-            bool ok = messages.find(nextFree, next);
-            (void) ok; assert( ok );
-        } else {
-            if (!messages.next( c->position, next ))
-                return false;           // shouldn't happen - should find nextFree
-        }
-    } else {  // no free groups available
-        if (consumers[c->getName()] == 0) {  // and none currently owned
-            return false;       // so nothing available to consume
-        }
-        if (!messages.next( c->position, next ))
-            return false;
-    }
-
-    do {
-        // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
-        std::string group( getGroupId( next ) );
-        GroupMap::iterator gs = messageGroups.find( group );
-        assert( gs != messageGroups.end() );
-        GroupState& state( gs->second );
-        if (!state.owned() || state.owner == c->getName()) {
-            return true;
-        }
-    } while (messages.next( next.position, next ));
-    return false;
-}
-
-
-bool MessageGroupManager::acquireMessage(const std::string& consumer, const QueuedMessage&
qm,
-                                         const Mutex::ScopedLock&)
-{
-    // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
-    std::string group( getGroupId(qm) );
-    GroupMap::iterator gs = messageGroups.find( group );
-    assert( gs != messageGroups.end() );
-    GroupState& state( gs->second );
-
-    if (!state.owned()) {
-        own( state, consumer );
-        QPID_LOG( trace, "group queue " << queue->getName() <<
-                  ": consumer name=" << consumer << " has acquired group id="
<< gs->first);
-        return true;
-    }
-    return state.owner == consumer;
-}
-
-namespace {
-    const std::string GroupQueryKey("qpid.message_group_queue");
-    const std::string GroupHeaderKey("group_header_key");
-    const std::string GroupStateKey("group_state");
-    const std::string GroupIdKey("group_id");
-    const std::string GroupMsgCount("msg_count");
-    const std::string GroupTimestamp("timestamp");
-    const std::string GroupConsumer("consumer");
-}
-
-void MessageGroupManager::query(qpid::types::Variant::Map& status,
-                                const Mutex::ScopedLock&) const
-{
-    /** Add a description of the current state of the message groups for this queue.
-        FORMAT:
-        { "qpid.message_group_queue":
-            { "group_header_key" : "<KEY>",
-              "group_state" :
-                   [ { "group_id"  : "<name>",
-                       "msg_count" : <int>,
-                       "timestamp" : <absTime>,
-                       "consumer"  : <consumer name> },
-                     {...} // one for each known group
-                   ]
-            }
-        }
-    **/
-
-    assert(status.find(GroupQueryKey) == status.end());
-    qpid::types::Variant::Map state;
-    qpid::types::Variant::List groups;
-
-    state[GroupHeaderKey] = groupIdHeader;
-    for (GroupMap::const_iterator g = messageGroups.begin();
-         g != messageGroups.end(); ++g) {
-        qpid::types::Variant::Map info;
-        info[GroupIdKey] = g->first;
-        info[GroupMsgCount] = g->second.members.size();
-        info[GroupTimestamp] = 0;   /** @todo KAG - NEED HEAD MSG TIMESTAMP */
-        info[GroupConsumer] = g->second.owner;
-        groups.push_back(info);
-    }
-    state[GroupStateKey] = groups;
-    status[GroupQueryKey] = state;
-}
-
-
-boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
-                                                                    const qpid::framing::FieldTable&
settings )
-{
-    boost::shared_ptr<MessageGroupManager> empty;
-
-    if (settings.isSet(qpidMessageGroupKey)) {
-
-        std::string headerKey = settings.getAsString(qpidMessageGroupKey);
-        if (headerKey.empty()) {
-            QPID_LOG( error, "A Message Group header key must be configured, queue=" <<
q->getName());
-            return empty;
-        }
-        unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
-
-        boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey,
q, timestamp ) );
-
-        q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
-
-        QPID_LOG( debug, "Configured Queue '" << q->getName() <<
-                  "' for message grouping using header key '" << headerKey <<
"'" <<
-                  " (timestamp=" << timestamp << ")");
-        return manager;
-    }
-    return empty;
-}
-
-
-
-
-
-



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message