qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1069322 [1/2] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ cpp/xml/ tests/src/py/qpid_tests/broker_0_10/
Date Thu, 10 Feb 2011 10:12:42 GMT
Author: gsim
Date: Thu Feb 10 10:12:41 2011
New Revision: 1069322

URL: http://svn.apache.org/viewvc?rev=1069322&view=rev
Log:
QPID-529: Priority queue implementation
QPID-2104: LVQ enhancement

These both required some refactoring of the Queue class to allow cleaner implementation of different types of behaviour. The in-memory storage of messages is now abstracted out behind an interface specified by qpid::broker::Messages which qpid::broker::Queue uses. Different implementations of that are available for the standard FIFO queue, priority queues and LVQ (I have also separated out the 'legacy' implementation of LVQ from the new version driven by QPID-2104).


Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Thu Feb 10 10:12:41 2011
@@ -956,6 +956,11 @@ set (qpidbroker_SOURCES
      qpid/broker/Broker.cpp
      qpid/broker/Exchange.cpp
      qpid/broker/ExpiryPolicy.cpp
+     qpid/broker/Fairshare.cpp
+     qpid/broker/LegacyLVQ.cpp
+     qpid/broker/MessageDeque.cpp
+     qpid/broker/MessageMap.cpp
+     qpid/broker/PriorityQueue.cpp
      qpid/broker/Queue.cpp
      qpid/broker/QueueCleaner.cpp
      qpid/broker/QueueListeners.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Thu Feb 10 10:12:41 2011
@@ -548,6 +548,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/ExchangeRegistry.h \
   qpid/broker/ExpiryPolicy.cpp \
   qpid/broker/ExpiryPolicy.h \
+  qpid/broker/Fairshare.h \
+  qpid/broker/Fairshare.cpp \
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/FanOutExchange.h \
   qpid/broker/FedOps.h \
@@ -556,6 +558,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/HeadersExchange.h \
   qpid/broker/IncompleteMessageList.cpp \
   qpid/broker/IncompleteMessageList.h \
+  qpid/broker/LegacyLVQ.h \
+  qpid/broker/LegacyLVQ.cpp \
   qpid/broker/Link.cpp \
   qpid/broker/Link.h \
   qpid/broker/LinkRegistry.cpp \
@@ -566,9 +570,16 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.cpp \
   qpid/broker/MessageBuilder.h \
+  qpid/broker/MessageDeque.h \
+  qpid/broker/MessageDeque.cpp \
+  qpid/broker/MessageMap.h \
+  qpid/broker/MessageMap.cpp \
+  qpid/broker/Messages.h \
   qpid/broker/MessageStore.h \
   qpid/broker/MessageStoreModule.cpp \
   qpid/broker/MessageStoreModule.h \
+  qpid/broker/PriorityQueue.h \
+  qpid/broker/PriorityQueue.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
   qpid/broker/NullMessageStore.cpp \

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp Thu Feb 10 10:12:41 2011
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Fairshare.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+
+Fairshare::Fairshare(size_t levels, uint limit) :
+    PriorityQueue(levels),
+    limits(levels, limit), priority(levels-1), count(0) {}
+
+
+void Fairshare::setLimit(size_t level, uint limit)
+{
+    limits[level] = limit;
+}
+
+bool Fairshare::limitReached()
+{
+    uint l = limits[priority];
+    return l && ++count > l;
+}
+
+uint Fairshare::currentLevel()
+{
+    if (limitReached()) {
+        return nextLevel();
+    } else {
+        return priority;
+    }
+}
+
+uint Fairshare::nextLevel()
+{
+    count = 1;
+    if (priority) --priority;
+    else priority = levels-1;
+    return priority;
+}
+
+bool Fairshare::isNull()
+{
+    for (int i = 0; i < levels; i++) if (limits[i]) return false;
+    return true;
+}
+
+bool Fairshare::getState(uint& p, uint& c) const
+{
+    p = priority;
+    c = count;
+    return true;
+}
+
+bool Fairshare::setState(uint p, uint c)
+{
+    priority = p;
+    count = c;
+    return true;
+}
+
+bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
+{
+    const uint start = p = currentLevel();
+    do {
+        if (!messages[p].empty()) return true;
+    } while ((p = nextLevel()) != start);
+    return false;
+}
+
+
+
+bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
+{
+    const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
+    return fairshare && fairshare->getState(priority, count);
+}
+
+bool Fairshare::setState(Messages& m, uint priority, uint count)
+{
+    Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
+    return fairshare && fairshare->setState(priority, count);
+}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+    if (!v) {
+        return 0;
+    } else if (v->convertsTo<int>()) {
+        return v->get<int>();
+    } else if (v->convertsTo<std::string>()){
+        std::string s = v->get<std::string>();
+        try { 
+            return boost::lexical_cast<int>(s); 
+        } catch(const boost::bad_lexical_cast&) {
+            QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+            return 0;
+        }
+    } else {
+        QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+        return 0;
+    }
+}
+
+int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue)
+{
+    return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue));
+}
+
+std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
+{
+    std::auto_ptr<Messages> result;
+    size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100);
+    if (levels) {
+        uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare");
+        std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
+        for (uint i = 0; i < levels; i++) {
+            std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str();
+            if(settings.isSet(key)) {
+                fairshare->setLimit(i, getIntegerSetting(settings, key));
+            }
+        }
+        
+        if (fairshare->isNull()) {
+            result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+        } else {
+            result = fairshare;
+        }
+    }
+    return result;
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h Thu Feb 10 10:12:41 2011
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_FAIRSHARE_H
+#define QPID_BROKER_FAIRSHARE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace broker {
+
+/**
+ * Modifies a basic prioirty queue by limiting the number of messages
+ * from each priority level that are dispatched before allowing
+ * dispatch from the next level.
+ */
+class Fairshare : public PriorityQueue
+{
+  public:
+    Fairshare(size_t levels, uint limit);
+    bool getState(uint& priority, uint& count) const;
+    bool setState(uint priority, uint count);
+    void setLimit(size_t level, uint limit);
+    static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
+    static bool getState(const Messages&, uint& priority, uint& count);
+    static bool setState(Messages&, uint priority, uint count);
+  private:
+    std::vector<uint> limits;
+    
+    uint priority;
+    uint count;
+    
+    uint currentLevel();
+    uint nextLevel();
+    bool isNull();
+    bool limitReached();
+    bool findFrontLevel(uint& p, PriorityLevels&);
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_FAIRSHARE_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Thu Feb 10 10:12:41 2011
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
+
+void LegacyLVQ::setNoBrowse(bool b)
+{ 
+    noBrowse = b;
+}
+
+bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(position);
+    if (i != messages.end() && i->second.payload == message.payload) {
+        message = i->second;
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    if (MessageMap::next(position, message)) {
+        if (!noBrowse) index.erase(getKey(message));
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
+{
+    //Hack to disable LVQ behaviour on cluster update:
+    if (broker && broker->isClusterUpdatee()) {
+        messages[added.position] = added;
+        return false;
+    } else {
+        return MessageMap::push(added, removed);
+    }
+}
+
+const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
+{ 
+    //add the new message into the original position of the replaced message
+    Ordering::iterator i = messages.find(original.position);
+    i->second = update;
+    i->second.position = original.position;
+    return i->second;
+}
+
+void LegacyLVQ::removeIf(Predicate p)
+{
+    //Note: This method is currently called periodically on the timer
+    //thread to expire messages. In a clustered broker this means that
+    //the purging does not occur on the cluster event dispatch thread
+    //and consequently that is not totally ordered w.r.t other events
+    //(including publication of messages). The cluster does ensure
+    //that the actual expiration of messages (as distinct from the
+    //removing of those expired messages from the queue) *is*
+    //consistently ordered w.r.t. cluster events. This means that
+    //delivery of messages is in general consistent across the cluster
+    //inspite of any non-determinism in the triggering of a
+    //purge. However at present purging a last value queue (of the
+    //legacy sort) could potentially cause inconsistencies in the
+    //cluster (as the order w.r.t publications can affect the order in
+    //which messages appear in the queue). Consequently periodic
+    //purging of an LVQ is not enabled if the broker is clustered
+    //(expired messages will be removed on delivery and consolidated
+    //by key as part of normal LVQ operation).
+
+    //TODO: Is there a neater way to check whether broker is
+    //clustered? Here we assume that if the clustered timer is the
+    //same as the regular timer, we are not clustered:
+    if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
+        MessageMap::removeIf(p);
+}
+
+std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current, 
+                                                   const std::string& key, bool noBrowse, Broker* broker)
+{
+    LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get());
+    if (lvq) { 
+        lvq->setNoBrowse(noBrowse);
+        return current;
+    } else {
+        return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker));
+    }
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h Thu Feb 10 10:12:41 2011
@@ -0,0 +1,59 @@
+#ifndef QPID_BROKER_LEGACYLVQ_H
+#define QPID_BROKER_LEGACYLVQ_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageMap.h"
+#include <memory>
+
+namespace qpid {
+namespace broker {
+class Broker;
+
+/**
+ * This class encapsulates the behaviour of the old style LVQ where a
+ * message replacing another messages for the given key will use the
+ * position in the queue of the previous message. This however causes
+ * problems for browsing. Either browsers stop the coalescing of
+ * messages by key (default) or they may mis updates (if the no-browse
+ * option is specified).
+ */
+class LegacyLVQ : public MessageMap
+{
+  public:
+    LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+    bool remove(const framing::SequenceNumber&, QueuedMessage&);
+    bool next(const framing::SequenceNumber&, QueuedMessage&);
+    bool push(const QueuedMessage& added, QueuedMessage& removed);
+    void removeIf(Predicate);
+    void setNoBrowse(bool);
+    static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current, 
+                                                   const std::string& key, bool noBrowse,
+                                                   Broker* broker);
+  protected:
+    bool noBrowse;
+    Broker* broker;
+
+    const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_LEGACYLVQ_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Feb 10 10:12:41 2011
@@ -399,22 +399,6 @@ bool Message::hasExpired()
     return expiryPolicy && expiryPolicy->hasExpired(*this);
 }
 
-boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
-{
-    sys::Mutex::ScopedLock l(lock);
-    Replacement::iterator i = replacement.find(qfor);
-    if (i != replacement.end()){
-        return i->second;
-    }           
-    return empty;
-}
-
-void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
-{
-    sys::Mutex::ScopedLock l(lock);
-    replacement[qfor] = msg;
-}
-
 namespace {
 struct ScopedSet {
     sys::Monitor& lock;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Feb 10 10:12:41 2011
@@ -153,8 +153,6 @@ public:
        void forcePersistent();
        bool isForcedPersistent();
     
-    boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
-    void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
 
     /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
     void setEnqueueCompleteCallback(MessageCallback& cb);
@@ -167,8 +165,6 @@ public:
     uint8_t getPriority() const;
 
   private:
-    typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
-
     MessageAdapter& getAdapter() const;
     void allEnqueuesComplete();
     void allDequeuesComplete();
@@ -188,7 +184,6 @@ public:
 
     static TransferAdapter TRANSFER;
 
-    mutable Replacement replacement;
     mutable boost::intrusive_ptr<Message> empty;
 
     sys::Monitor callbackLock;

Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp Thu Feb 10 10:12:41 2011
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+size_t MessageDeque::size()
+{
+    return messages.size();
+}
+
+bool MessageDeque::empty()
+{
+    return messages.empty();
+}
+
+void MessageDeque::reinsert(const QueuedMessage& message)
+{
+    messages.insert(lower_bound(messages.begin(), messages.end(), message), message);
+}
+
+MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position)
+{
+    if (!messages.empty()) {
+        QueuedMessage comp;
+        comp.position = position;
+        unsigned long diff = position.getValue() - messages.front().position.getValue();
+        long maxEnd = diff < messages.size()? diff : messages.size();        
+        return lower_bound(messages.begin(),messages.begin()+maxEnd,comp);
+    } else {
+        return messages.end();
+    }
+}
+
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+{
+    Deque::iterator i = seek(position);
+    if (i != messages.end() && i->position == position) {
+        message = *i;
+        if (remove) messages.erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    return find(position, message, true);
+}
+
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    return find(position, message, false);
+}
+
+bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    if (messages.empty()) {
+        return false;
+    } else if (position < front().position) {
+        message = front();
+        return true;
+    } else {
+        Deque::iterator i = seek(position+1);
+        if (i != messages.end()) {
+            message = *i;
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
+
+QueuedMessage& MessageDeque::front()
+{
+    return messages.front();
+}
+
+void MessageDeque::pop()
+{
+    if (!messages.empty()) {
+        messages.pop_front();
+    }
+}
+
+bool MessageDeque::pop(QueuedMessage& out)
+{
+    if (messages.empty()) {
+        return false;
+    } else {
+        out = front();
+        messages.pop_front();
+        return true;
+    }
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+{
+    messages.push_back(added);
+    return false;//adding a message never causes one to be removed for deque
+}
+
+void MessageDeque::foreach(Functor f)
+{
+    std::for_each(messages.begin(), messages.end(), f);
+}
+
+void MessageDeque::removeIf(Predicate p)
+{
+    for (Deque::iterator i = messages.begin(); i != messages.end();) {
+        if (p(*i)) {
+            i = messages.erase(i);
+        } else {
+            ++i;
+        }
+    }
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h Thu Feb 10 10:12:41 2011
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_MESSAGEDEQUE_H
+#define QPID_BROKER_MESSAGEDEQUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include <deque>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Provides the standard FIFO queue behaviour.
+ */
+class MessageDeque : public Messages
+{
+  public:
+    size_t size();
+    bool empty();
+
+    void reinsert(const QueuedMessage&);
+    bool remove(const framing::SequenceNumber&, QueuedMessage&);
+    bool find(const framing::SequenceNumber&, QueuedMessage&);
+    bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+    QueuedMessage& front();
+    void pop();
+    bool pop(QueuedMessage&);
+    bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+    void foreach(Functor);
+    void removeIf(Predicate);
+
+  private:
+    typedef std::deque<QueuedMessage> Deque;
+    Deque messages;
+
+    Deque::iterator seek(const framing::SequenceNumber&);
+    bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_MESSAGEDEQUE_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp Thu Feb 10 10:12:41 2011
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageMap.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+namespace {
+const std::string EMPTY;
+}
+
+std::string MessageMap::getKey(const QueuedMessage& message)
+{
+    const framing::FieldTable* ft = message.payload->getApplicationHeaders();
+    if (ft) return ft->getAsString(key);
+    else return EMPTY;
+}
+
+size_t MessageMap::size()
+{
+    return messages.size();
+}
+
+bool MessageMap::empty()
+{
+    return messages.empty();
+}
+
+void MessageMap::reinsert(const QueuedMessage& message)
+{
+    std::string key = getKey(message);
+    Index::iterator i = index.find(key);
+    if (i == index.end()) {
+        index[key] = message;
+        messages[message.position] = message;
+    } //else message has already been replaced
+}
+
+bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(position);
+    if (i != messages.end()) {
+        message = i->second;
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(position);
+    if (i != messages.end()) {
+        message = i->second;
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    if (!messages.empty() && position < front().position) {
+        message = front();
+        return true;
+    } else {
+        Ordering::iterator i = messages.lower_bound(position+1);
+        if (i != messages.end()) {
+            message = i->second;
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
+
+QueuedMessage& MessageMap::front()
+{
+    return messages.begin()->second;
+}
+
+void MessageMap::pop()
+{
+    QueuedMessage dummy;
+    pop(dummy);
+}
+
+bool MessageMap::pop(QueuedMessage& out)
+{
+    Ordering::iterator i = messages.begin();
+    if (i != messages.end()) {
+        out = i->second;
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+{
+    messages.erase(original.position);
+    messages[update.position] = update;
+    return update;
+}
+
+bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+{
+    std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
+    if (result.second) {
+        //there was no previous message for this key; nothing needs to
+        //be removed, just add the message into its correct position
+        messages[added.position] = added;
+        return false;
+    } else {
+        //there is already a message with that key which needs to be replaced
+        removed = result.first->second;
+        result.first->second = replace(result.first->second, added);
+        return true;
+    }
+}
+
+void MessageMap::foreach(Functor f)
+{
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        f(i->second);
+    }
+}
+
+void MessageMap::removeIf(Predicate p)
+{
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
+        if (p(i->second)) {
+            erase(i);
+        }
+    }
+}
+
+void MessageMap::erase(Ordering::iterator i)
+{
+    index.erase(getKey(i->second));
+    messages.erase(i);
+}
+
+MessageMap::MessageMap(const std::string& k) : key(k) {}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h Thu Feb 10 10:12:41 2011
@@ -0,0 +1,72 @@
+#ifndef QPID_BROKER_MESSAGEMAP_H
+#define QPID_BROKER_MESSAGEMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <map>
+#include <string>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Provides a last value queue behaviour, whereby a messages replace
+ * any previous message with the same value for a defined property
+ * (i.e. the key).
+ */
+class MessageMap : public Messages
+{
+  public:
+    MessageMap(const std::string& key);
+    virtual ~MessageMap() {}
+
+    size_t size();
+    bool empty();
+
+    void reinsert(const QueuedMessage&);
+    virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
+    bool find(const framing::SequenceNumber&, QueuedMessage&);
+    virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+    QueuedMessage& front();
+    void pop();
+    bool pop(QueuedMessage&);
+    virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+    void foreach(Functor);
+    virtual void removeIf(Predicate);
+
+  protected:
+    typedef std::map<std::string, QueuedMessage> Index;
+    typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering;
+    const std::string key;
+    Index index;
+    Ordering messages;
+
+    std::string getKey(const QueuedMessage&);
+    virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+    void erase(Ordering::iterator);
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_MESSAGEMAP_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h Thu Feb 10 10:12:41 2011
@@ -0,0 +1,117 @@
+#ifndef QPID_BROKER_MESSAGES_H
+#define QPID_BROKER_MESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing {
+class SequenceNumber;
+}
+namespace broker {
+struct QueuedMessage;
+
+/**
+ * This interface abstracts out the access to the messages held for
+ * delivery by a Queue instance.
+ */
+class Messages
+{
+  public:
+    typedef boost::function1<void, QueuedMessage&> Functor;
+    typedef boost::function1<bool, QueuedMessage&> Predicate;
+
+    virtual ~Messages() {}
+    /**
+     * @return the number of messages available for delivery.
+     */
+    virtual size_t size() = 0;
+    /**
+     * @return true if there are no messages for delivery, false otherwise
+     */
+    virtual bool empty() = 0;
+
+    /**
+     * Re-inserts a message back into its original position - used
+     * when requeing released messages.
+     */
+    virtual void reinsert(const QueuedMessage&) = 0;
+    /**
+     * Remove the message at the specified position, returning true if
+     * found, false otherwise. The removed message is passed back via
+     * the second parameter.
+     */
+    virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0;
+    /**
+     * Find the message at the specified position, returning true if
+     * found, false otherwise. The matched message is passed back via
+     * the second parameter.
+     */
+    virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
+    /**
+     * Return the next message to be given to a browsing subscrption
+     * that has reached the specified poisition. The next messages is
+     * passed back via the second parameter.
+     *
+     * @return true if there is another message, false otherwise.
+     */
+    virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
+
+    /**
+     * Note: Caller is responsible for ensuring that there is a front
+     * (e.g. empty() returns false)
+     *
+     * @return the next message to be delivered
+     */
+    virtual QueuedMessage& front() = 0;
+    /**
+     * Removes the front message
+     */
+    virtual void pop() = 0;
+    /**
+     * @return true if there is a mesage to be delivered - in which
+     * case that message will be returned via the parameter and
+     * removed - otherwise false.
+     */
+    virtual bool pop(QueuedMessage&) = 0;
+    /**
+     * Pushes a message to the back of the 'queue'. For some types of
+     * queue this may cause another message to be removed; if that is
+     * the case the method will return true and the removed message
+     * will be passed out via the second parameter.
+     */
+    virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+
+    /**
+     * Apply the functor to each message held
+     */
+    virtual void foreach(Functor) = 0;
+    /**
+     * Remove every message held that for which the specified
+     * predicate returns true
+     */
+    virtual void removeIf(Predicate) = 0;
+  private:
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_MESSAGES_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Thu Feb 10 10:12:41 2011
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+PriorityQueue::PriorityQueue(int l) : 
+    levels(l),
+    messages(levels, Deque()),
+    frontLevel(0), haveFront(false), cached(false) {}
+
+size_t PriorityQueue::size()
+{
+    size_t total(0);
+    for (int i = 0; i < levels; ++i) {
+        total += messages[i].size();
+    }
+    return total;
+}
+
+bool PriorityQueue::empty()
+{
+    for (int i = 0; i < levels; ++i) {
+        if (!messages[i].empty()) return false;
+    }
+    return true;
+}
+
+void PriorityQueue::reinsert(const QueuedMessage& message)
+{
+    uint p = getPriorityLevel(message);
+    messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
+    clearCache();
+}
+
+bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+{
+    QueuedMessage comp;
+    comp.position = position;
+    for (int i = 0; i < levels; ++i) {
+        if (!messages[i].empty()) {
+            unsigned long diff = position.getValue() - messages[i].front().position.getValue();
+            long maxEnd = diff < messages[i].size() ? diff : messages[i].size();        
+            Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
+            if (l != messages[i].end() && l->position == position) {
+                message = *l;
+                if (remove) {
+                    messages[i].erase(l);
+                    clearCache();
+                }
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
+bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    return find(position, message, true);
+}
+
+bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    return find(position, message, false);
+}
+
+bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+    QueuedMessage match;
+    match.position = position+1;
+    Deque::iterator lowest;
+    bool found = false;
+    for (int i = 0; i < levels; ++i) {
+        Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); 
+        if (m != messages[i].end()) {
+            if (m->position == match.position) {
+                message = *m;
+                return true;
+            } else if (!found || m->position < lowest->position) {
+                lowest = m;
+                found = true;
+            }
+        }
+    }
+    if (found) {
+        message = *lowest;
+    }
+    return found;
+}
+
+QueuedMessage& PriorityQueue::front()
+{
+    if (checkFront()) {
+        return messages[frontLevel].front();
+    } else {
+        throw qpid::framing::InternalErrorException(QPID_MSG("No message available"));
+    }
+}
+
+bool PriorityQueue::pop(QueuedMessage& message)
+{
+    if (checkFront()) {
+        message = messages[frontLevel].front();
+        messages[frontLevel].pop_front();
+        clearCache();
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void PriorityQueue::pop()
+{
+    QueuedMessage dummy;
+    pop(dummy);
+}
+
+bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+{
+    messages[getPriorityLevel(added)].push_back(added);
+    clearCache();
+    return false;//adding a message never causes one to be removed for deque
+}
+
+void PriorityQueue::foreach(Functor f)
+{
+    for (int i = 0; i < levels; ++i) {
+        std::for_each(messages[i].begin(), messages[i].end(), f);
+    }
+}
+
+void PriorityQueue::removeIf(Predicate p)
+{
+    for (int priority = 0; priority < levels; ++priority) {
+        for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
+            if (p(*i)) {
+                i = messages[priority].erase(i);
+                clearCache();
+            } else {
+                ++i;
+            }
+        }
+    }
+}
+
+uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+{
+    uint priority = m.payload->getPriority();
+    //Use AMQP 0-10 approach to mapping priorities to a fixed level
+    //(see rule priority-level-implementation)
+    const uint firstLevel = 5 - std::min(5.0, ceil((double) levels/2.0));
+    if (priority <= firstLevel) return 0;
+    return std::min(priority - firstLevel, (uint)levels-1);
+}
+
+void PriorityQueue::clearCache()
+{
+    cached = false;
+}
+
+bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+{
+    for (int p = levels-1; p >= 0; --p) {
+        if (!m[p].empty()) {
+            l = p;
+            return true;
+        }
+    }
+    return false;
+}
+
+bool PriorityQueue::checkFront()
+{
+    if (!cached) {
+        haveFront = findFrontLevel(frontLevel, messages);
+        cached = true;
+    }
+    return haveFront;
+}
+
+uint PriorityQueue::getPriority(const QueuedMessage& message)
+{
+    const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
+    if (queue) return queue->getPriorityLevel(message);
+    else return 0;
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h Thu Feb 10 10:12:41 2011
@@ -0,0 +1,77 @@
+#ifndef QPID_BROKER_PRIORITYQUEUE_H
+#define QPID_BROKER_PRIORITYQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include <deque>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Basic priority queue with a configurable number of recognised
+ * priority levels. This is implemented as a separate deque per
+ * priority level. Browsing is FIFO not priority order.
+ */
+class PriorityQueue : public Messages
+{
+  public:
+    PriorityQueue(int levels);
+    virtual ~PriorityQueue() {}
+    size_t size();
+    bool empty();
+
+    void reinsert(const QueuedMessage&);
+    bool remove(const framing::SequenceNumber&, QueuedMessage&);
+    bool find(const framing::SequenceNumber&, QueuedMessage&);
+    bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+    QueuedMessage& front();
+    void pop();
+    bool pop(QueuedMessage&);
+    bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+    void foreach(Functor);
+    void removeIf(Predicate);
+    static uint getPriority(const QueuedMessage&);
+  protected:
+    typedef std::deque<QueuedMessage> Deque;
+    typedef std::vector<Deque> PriorityLevels;
+    virtual bool findFrontLevel(uint& p, PriorityLevels&);
+
+    const int levels;
+  private:
+    PriorityLevels messages;
+    uint frontLevel;
+    bool haveFront;
+    bool cached;
+    
+    bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+    uint getPriorityLevel(const QueuedMessage&) const;
+    void clearCache();
+    bool checkFront();
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_PRIORITYQUEUE_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 10 10:12:41 2011
@@ -23,7 +23,11 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
 #include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
@@ -66,6 +70,7 @@ const std::string qpidMaxCount("qpid.max
 const std::string qpidNoLocal("no-local");
 const std::string qpidTraceIdentity("qpid.trace.id");
 const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
 const std::string qpidLastValueQueue("qpid.last_value_queue");
 const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
 const std::string qpidPersistLastNode("qpid.persist_last_node");
@@ -92,10 +97,9 @@ Queue::Queue(const string& _name, bool _
     consumerCount(0),
     exclusive(0),
     noLocal(false),
-    lastValueQueue(false),
-    lastValueQueueNoBrowse(false),
     persistLastNode(false),
     inLastNodeFailure(false),
+    messages(new MessageDeque()),
     persistenceId(0),
     policyExceeded(false),
     mgmtObject(0),
@@ -212,7 +216,7 @@ void Queue::requeue(const QueuedMessage&
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         msg.payload->enqueueComplete(); // mark the message as enqueued
-        messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+        messages->reinsert(msg);
         listeners.populate(copy);
 
         // for persistLastNode - don't force a message twice to disk, but force it if no force before 
@@ -227,57 +231,23 @@ void Queue::requeue(const QueuedMessage&
     copy.notify();
 }
 
-void Queue::clearLVQIndex(const QueuedMessage& msg){
-    assertClusterSafe();
-    const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
-    if (lastValueQueue && ft){
-        string key = ft->getAsString(qpidVQMatchProperty);
-        lvq.erase(key);
-    }
-}
-
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
 {
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    
-    Messages::iterator i = findAt(position); 
-    if (i != messages.end() ) {
-        message = *i;
-        if (lastValueQueue) {
-            clearLVQIndex(*i);
-        }
-        QPID_LOG(debug,
-                 "Acquired message at " << i->position << " from " << name);
-        messages.erase(i);
+    if (messages->remove(position, message)) {
+        QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
-    } 
-    QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
-    return false;
+    } else {
+        QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+        return false;
+    }
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
-    Mutex::ScopedLock locker(messageLock);
-    assertClusterSafe();
-
-    QPID_LOG(debug, "attempting to acquire " << msg.position);
-    Messages::iterator i = findAt(msg.position); 
-    if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
-        (!lastValueQueue ||
-        (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
-        )  {
-
-        clearLVQIndex(msg);
-        QPID_LOG(debug,
-                 "Match found, acquire succeeded: " <<
-                 i->position << " == " << msg.position);
-        messages.erase(i);
-        return true;
-    } 
-    
-    QPID_LOG(debug, "Acquire failed for " << msg.position);
-    return false;
+    QueuedMessage copy = msg;
+    return acquireMessageAt(msg.position, copy);
 }
 
 void Queue::notifyListener()
@@ -286,7 +256,7 @@ void Queue::notifyListener()
     QueueListeners::NotificationSet set;
     {
         Mutex::ScopedLock locker(messageLock);
-        if (messages.size()) {
+        if (messages->size()) {
             listeners.populate(set);
         }
     }
@@ -315,12 +285,12 @@ Queue::ConsumeCode Queue::consumeNextMes
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        if (messages.empty()) { 
+        if (messages->empty()) { 
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
         } else {
-            QueuedMessage msg = getFront();
+            QueuedMessage msg = messages->front();
             if (msg.payload->hasExpired()) {
                 QPID_LOG(debug, "Message expired from queue '" << name << "'");
                 popAndDequeue();
@@ -330,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMes
             if (c->filter(msg.payload)) {
                 if (c->accept(msg.payload)) {            
                     m = msg;
-                    popMsg(msg);
+                    pop();
                     return CONSUMED;
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
@@ -356,11 +326,6 @@ bool Queue::browseNextMessage(QueuedMess
                 //consumer wants the message
                 c->position = msg.position;
                 m = msg;
-                if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
-                if (lastValueQueue) {
-                    boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
-                    if (replacement.get()) m.payload = replacement;
-                }
                 return true;
             } else {
                 //browser hasn't got enough credit for the message
@@ -382,7 +347,7 @@ void Queue::removeListener(Consumer::sha
     {
         Mutex::ScopedLock locker(messageLock);
         listeners.removeListener(c);
-        if (messages.size()) {
+        if (messages->size()) {
             listeners.populate(set);
         }
     }
@@ -403,52 +368,20 @@ bool Queue::dispatch(Consumer::shared_pt
 // Find the next message 
 bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
-    if (!messages.empty() && messages.back().position > c->position) {
-        if (c->position < getFront().position) {
-            msg = getFront();
-            return true;
-        } else {        
-            Messages::iterator pos = findAt(c->position);
-            if (pos != messages.end() && pos+1 != messages.end()) {
-                msg = *(pos+1);
-                return true;
-            }
-        }
+    if (messages->next(c->position, msg)) {
+        return true;
+    } else {
+        listeners.addListener(c);
+        return false;
     }
-    listeners.addListener(c);
-    return false;
 }
 
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
-    if(!messages.empty()){
-        QueuedMessage compM;
-        compM.position = pos;
-        unsigned long diff = pos.getValue() - messages.front().position.getValue();
-        long maxEnd = diff < messages.size()? diff : messages.size();
-
-        Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); 
-        if (i!= messages.end() && i->position == pos)
-            return i;
-    }    
-    return messages.end(); // no match found.
-}
-
-
 QueuedMessage Queue::find(SequenceNumber pos) const {
 
     Mutex::ScopedLock locker(messageLock);
-    if(!messages.empty()){
-        QueuedMessage compM;
-        compM.position = pos;
-        unsigned long diff = pos.getValue() - messages.front().position.getValue();
-        long maxEnd = diff < messages.size()? diff : messages.size();
-
-        Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); 
-        if (i != messages.end())
-            return *i;
-    }
-    return QueuedMessage();
+    QueuedMessage msg;
+    messages->find(pos, msg);
+    return msg;
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -482,12 +415,18 @@ void Queue::cancel(Consumer::shared_ptr 
 QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
+    messages->pop(msg);
+    return msg;
+}
 
-    if(!messages.empty()){
-        msg = getFront();
-        popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+    if (message.payload->hasExpired()) {
+        expired.push_back(message);
+        return true;
+    } else {
+        return false;
     }
-    return msg;
 }
 
 void Queue::purgeExpired()
@@ -496,37 +435,11 @@ void Queue::purgeExpired()
     //bother explicitly expiring if the rate of dequeues since last
     //attempt is less than one per second.  
 
-    //Note: This method is currently called periodically on the timer
-    //thread. In a clustered broker this means that the purging does
-    //not occur on the cluster event dispatch thread and consequently
-    //that is not totally ordered w.r.t other events (including
-    //publication of messages). However the cluster does ensure that
-    //the actual expiration of messages (as distinct from the removing
-    //of those expired messages from the queue) *is* consistently
-    //ordered w.r.t. cluster events. This means that delivery of
-    //messages is in general consistent across the cluster inspite of
-    //any non-determinism in the triggering of a purge. However at
-    //present purging a last value queue could potentially cause
-    //inconsistencies in the cluster (as the order w.r.t publications
-    //can affect the order in which messages appear in the
-    //queue). Consequently periodic purging of an LVQ is not enabled
-    //(expired messages will be removed on delivery and consolidated
-    //by key as part of normal LVQ operation).
-
-    if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
-        Messages expired;
+    if (dequeueTracker.sampleRatePerSecond() < 1) {
+        std::deque<QueuedMessage> expired;
         {
             Mutex::ScopedLock locker(messageLock);
-            for (Messages::iterator i = messages.begin(); i != messages.end();) {
-                //Re-introduce management of LVQ-specific state here
-                //if purging is renabled for that case (see note above)
-                if (i->payload->hasExpired()) {
-                    expired.push_back(*i);
-                    i = messages.erase(i);
-                } else {
-                    ++i;
-                }
-            }
+            messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
         }
         for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
@@ -552,13 +465,13 @@ uint32_t Queue::purge(const uint32_t pur
 
     uint32_t count = 0;
     // Either purge them all or just the some (purge_count) while the queue isn't empty.
-    while((!purge_request || purge_count--) && !messages.empty()) {
+    while((!purge_request || purge_count--) && !messages->empty()) {
         if (dest.get()) {
             //
             // If there is a destination exchange, stage the messages onto a reroute queue
             // so they don't wind up getting purged more than once.
             //
-            DeliverableMessage msg(getFront().payload);
+            DeliverableMessage msg(messages->front().payload);
             rerouteQueue.push_back(msg);
         }
         popAndDequeue();
@@ -584,64 +497,37 @@ uint32_t Queue::move(const Queue::shared
     uint32_t move_count = qty; // only comes into play if  qty >0 
     uint32_t count = 0; // count how many were moved for returning
 
-    while((!qty || move_count--) && !messages.empty()) {
-        QueuedMessage qmsg = getFront();
+    while((!qty || move_count--) && !messages->empty()) {
+        QueuedMessage qmsg = messages->front();
         boost::intrusive_ptr<Message> msg = qmsg.payload;
         destq->deliver(msg); // deliver message to the destination queue
-        popMsg(qmsg);
+        pop();
         dequeue(0, qmsg);
         count++;
     }
     return count;
 }
 
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
 {
     assertClusterSafe();
-    const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
-    if (lastValueQueue && ft){
-        string key = ft->getAsString(qpidVQMatchProperty);
-        lvq.erase(key);
-    }
-    messages.pop_front();
+    messages->pop();
     ++dequeueTracker;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
+    QueuedMessage removed;
+    bool dequeueRequired = false;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
-        LVQ::iterator i;
-        const framing::FieldTable* ft = msg->getApplicationHeaders();
-        if (lastValueQueue && ft){
-            string key = ft->getAsString(qpidVQMatchProperty);
-
-            i = lvq.find(key);
-            if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
-                messages.push_back(qm);
-                listeners.populate(copy);
-                lvq[key] = msg; 
-            }else {
-                boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
-                if (!old) old = i->second;
-                i->second->setReplacementMessage(msg,this);
-                if (isRecovery) {
-                    //can't issue new requests for the store until
-                    //recovery is complete
-                    pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
-                } else {
-                    Mutex::ScopedUnlock u(messageLock);   
-                    dequeue(0, QueuedMessage(qm.queue, old, qm.position));
-                }
-            }           
-        }else {
-            messages.push_back(qm);
-            listeners.populate(copy);
-        }
+        dequeueRequired = messages->push(qm, removed);
+        listeners.populate(copy);
+
         if (eventMode) {
             if (eventMgr) eventMgr->enqueued(qm);
             else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
@@ -651,32 +537,20 @@ void Queue::push(boost::intrusive_ptr<Me
         }
     }
     copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
-    QueuedMessage msg = messages.front();
-    if (lastValueQueue) {
-        boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
-        if (replacement.get()) msg.payload = replacement;
+    if (dequeueRequired) {
+        if (isRecovery) {
+            //can't issue new requests for the store until
+            //recovery is complete
+            pendingDequeues.push_back(removed);
+        } else {
+            dequeue(0, removed);
+        }
     }
-    return msg;
 }
 
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
 {
-    boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
-    if (replacement.get()) {
-        const framing::FieldTable* ft = replacement->getApplicationHeaders();
-        if (ft) {
-            string key = ft->getAsString(qpidVQMatchProperty);
-            if (lvq.find(key) != lvq.end()){
-                lvq[key] = replacement; 
-            }        
-        }
-        msg.payload = replacement;
-    }
-    return msg;
+    if (message.payload->isEnqueueComplete()) (*result)++;
 }
 
 /** function only provided for unit tests, or code not in critical message path */
@@ -684,20 +558,14 @@ uint32_t Queue::getEnqueueCompleteMessag
 {
     Mutex::ScopedLock locker(messageLock);
     uint32_t count = 0;
-    for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
-        //NOTE: don't need to use checkLvqReplace() here as it
-        //is only relevant for LVQ which does not support persistence
-        //so the enqueueComplete check has no effect
-        if ( i->payload->isEnqueueComplete() ) count ++;
-    }
-    
+    messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
     return count;
 }
 
 uint32_t Queue::getMessageCount() const
 {
     Mutex::ScopedLock locker(messageLock);
-    return messages.size();
+    return messages->size();
 }
 
 uint32_t Queue::getConsumerCount() const
@@ -717,21 +585,22 @@ void Queue::clearLastNodeFailure()
     inLastNodeFailure = false;
 }
 
+void Queue::forcePersistent(QueuedMessage& message)
+{
+    if(!message.payload->isStoredOnQueue(shared_from_this())) {
+        message.payload->forcePersistent();
+        if (message.payload->isForcedPersistent() ){
+            enqueue(0, message.payload);
+        }
+    }
+}
+
 void Queue::setLastNodeFailure()
 {
     if (persistLastNode){
         Mutex::ScopedLock locker(messageLock);
         try {
-    	    for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
-                if (lastValueQueue) checkLvqReplace(*i);
-                // don't force a message twice to disk.
-                if(!i->payload->isStoredOnQueue(shared_from_this())) {
-                    i->payload->forcePersistent();
-                    if (i->payload->isForcedPersistent() ){
-            	        enqueue(0, i->payload);
-                    }
-                }
-    	    }
+            messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
         } catch (const std::exception& e) {
             // Could not go into last node standing (for example journal not large enough)
             QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -748,7 +617,7 @@ bool Queue::enqueue(TransactionContext* 
     if (!u.acquired) return false;
 
     if (policy.get() && !suppressPolicyCheck) {
-        Messages dequeues;
+        std::deque<QueuedMessage> dequeues;
         {
             Mutex::ScopedLock locker(messageLock);
             policy->tryEnqueue(msg);
@@ -835,8 +704,8 @@ void Queue::dequeueCommitted(const Queue
  */
 void Queue::popAndDequeue()
 {
-    QueuedMessage msg = getFront();
-    popMsg(msg);
+    QueuedMessage msg = messages->front();
+    pop();
     dequeue(0, msg);
 }
 
@@ -885,13 +754,22 @@ void Queue::configure(const FieldTable& 
     noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
 
-    lastValueQueue= _settings.get(qpidLastValueQueue);
-    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
-    lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
-    if (lastValueQueueNoBrowse){
-        QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
-        lastValueQueue = lastValueQueueNoBrowse;
+    std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+    if (lvqKey.size()) {
+        QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value Queue with key " << lvqKey);
+        messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+    } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+        QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue with 'no-browse' on");
+        messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+    } else if (_settings.get(qpidLastValueQueue)) {
+        QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue");
+        messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+    } else {
+        std::auto_ptr<Messages> m = Fairshare::create(_settings);
+        if (m.get()) {
+            messages = m;
+            QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
+        }
     }
     
     persistLastNode= _settings.get(qpidPersistLastNode);
@@ -919,8 +797,8 @@ void Queue::destroy()
 {
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
-        while(!messages.empty()){
-            DeliverableMessage msg(getFront().payload);
+        while(!messages->empty()){
+            DeliverableMessage msg(messages->front().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
                                      msg.getMessage().getApplicationHeaders());
             popAndDequeue();
@@ -1198,6 +1076,8 @@ bool Queue::isEnqueued(const QueuedMessa
 }
 
 QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
 
 void Queue::checkNotDeleted()
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 10 10:12:41 2011
@@ -26,6 +26,7 @@
 #include "qpid/broker/OwnershipToken.h"
 #include "qpid/broker/Consumer.h"
 #include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
 #include "qpid/broker/PersistableQueue.h"
 #include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueBindings.h"
@@ -85,10 +86,9 @@ class Queue : public boost::enable_share
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
             
-    typedef std::deque<QueuedMessage> Messages;
-    typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
+
     const std::string name;
     const bool autodelete;
     MessageStore* store;
@@ -96,16 +96,13 @@ class Queue : public boost::enable_share
     uint32_t consumerCount;
     OwnershipToken* exclusive;
     bool noLocal;
-    bool lastValueQueue;
-    bool lastValueQueueNoBrowse;
     bool persistLastNode;
     bool inLastNodeFailure;
     std::string traceId;
     std::vector<std::string> traceExclude;
     QueueListeners listeners;
-    Messages messages;
-    Messages pendingDequeues;//used to avoid dequeuing during recovery
-    LVQ lvq;
+    std::auto_ptr<Messages> messages;
+    std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
     mutable qpid::sys::Mutex consumerLock;
     mutable qpid::sys::Monitor messageLock;
     mutable qpid::sys::Mutex ownershipLock;
@@ -140,11 +137,10 @@ class Queue : public boost::enable_share
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
     void dequeued(const QueuedMessage& msg);
-    void popMsg(QueuedMessage& qmsg);
+    void pop();
     void popAndDequeue();
     QueuedMessage getFront();
-    QueuedMessage& checkLvqReplace(QueuedMessage& msg);
-    void clearLVQIndex(const QueuedMessage& msg);
+    void forcePersistent(QueuedMessage& msg);
 
     inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
     {
@@ -169,7 +165,6 @@ class Queue : public boost::enable_share
         }
     }
             
-    Messages::iterator findAt(framing::SequenceNumber pos);
     void checkNotDeleted();
 
   public:
@@ -320,13 +315,7 @@ class Queue : public boost::enable_share
     /** Apply f to each Message on the queue. */
     template <class F> void eachMessage(F f) {
         sys::Mutex::ScopedLock l(messageLock);
-        if (lastValueQueue) {
-            for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
-                f(checkLvqReplace(*i));
-            }
-        } else {
-            std::for_each(messages.begin(), messages.end(), f);
-        }
+        messages->foreach(f);
     }
 
     /** Apply f to each QueueBinding on the queue */
@@ -352,6 +341,8 @@ class Queue : public boost::enable_share
 
     // For cluster update
     QueueListeners& getListeners();
+    Messages& getMessages();
+    const Messages& getMessages() const;
 
     /**
      * Reserve space in policy for an enqueued message that

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Thu Feb 10 10:12:41 2011
@@ -20,6 +20,7 @@
  */
 #include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/PriorityQueue.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -213,7 +214,10 @@ RingQueuePolicy::RingQueuePolicy(const s
 
 bool before(const QueuedMessage& a, const QueuedMessage& b)
 {
-    return a.position < b.position;
+    int priorityA = PriorityQueue::getPriority(a);
+    int priorityB = PriorityQueue::getPriority(b);
+    if (priorityA == priorityB) return a.position < b.position;
+    else return priorityA < priorityB;
 }
 
 void RingQueuePolicy::enqueued(const QueuedMessage& m)

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Feb 10 10:12:41 2011
@@ -32,6 +32,7 @@
 #include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Queue.h"
@@ -548,6 +549,13 @@ void Connection::queuePosition(const str
     findQueue(qname)->setPosition(position);
 }
 
+void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+{
+    if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+        QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
+    }
+}
+
 void Connection::expiryId(uint64_t id) {
     cluster.getExpiryPolicy().setId(id);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Feb 10 10:12:41 2011
@@ -152,6 +152,7 @@ class Connection :
                         uint32_t credit);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
+    void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
     void expiryId(uint64_t);
 
     void txStart();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Feb 10 10:12:41 2011
@@ -32,6 +32,7 @@
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Future.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Fairshare.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/LinkRegistry.h"
@@ -352,6 +353,10 @@ void UpdateClient::updateQueue(client::A
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
     q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
     ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
+    uint priority, count;
+    if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
+        ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+    }
 }
 
 void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Feb 10 10:12:41 2011
@@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPo
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
 }
 
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    FieldTable arguments;
+    arguments.setInt("x-qpid-priorities", 10);
+    arguments.setInt("x-qpid-fairshare", 5);
+    c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+    //send messages of different priorities
+    for (int i = 0; i < 20; i++) {
+        Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+        msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+        c0.session.messageTransfer(arg::content=msg);
+    }
+
+    //pull off a couple of the messages (first four should be the top priority messages
+    for (int i = 0; i < 4; i++) {
+        BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+    }
+
+    // Add another member
+    cluster.add();
+    Client c1(cluster[1], "c1");
+
+    //pull off some more messages
+    BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+    BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData());
+    BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+    //check queue has same content on both nodes
+    BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12));
+}
+
 QPID_AUTO_TEST_SUITE_END()
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Thu Feb 10 10:12:41 2011
@@ -206,6 +206,7 @@ int main(int argc, char ** argv)
                             if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
                             if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
                             if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
+                            if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl;
                             if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
                             if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
                             std::cout << "Properties: " << msg.getProperties() << std::endl;

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-send.cpp Thu Feb 10 10:12:41 2011
@@ -56,6 +56,7 @@ struct Options : public qpid::Options
     uint sendEos;
     bool durable;
     uint ttl;
+    uint priority;
     std::string userid;
     std::string correlationid;
     string_vector properties;
@@ -84,6 +85,7 @@ struct Options : public qpid::Options
           sendEos(0),
           durable(false),
           ttl(0),
+          priority(0),
           contentString(),
           contentSize(0),
           contentStdin(false),
@@ -110,6 +112,7 @@ struct Options : public qpid::Options
             ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
             ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
 	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+	    ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
             ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -266,6 +269,9 @@ int main(int argc, char ** argv)
             if (opts.ttl) {
                 msg.setTtl(Duration(opts.ttl));
             }
+            if (opts.priority) {
+                msg.setPriority(opts.priority);
+            }
             if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto));
             if (!opts.userid.empty()) msg.setUserId(opts.userid);
             if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Feb 10 10:12:41 2011
@@ -275,6 +275,13 @@
 
     <!-- Replicate encoded config objects - e.g. links and bridges. -->
     <control name="config" code="0x37"><field name="encoded" type="str32"/></control>
+
+    <!-- Set the fairshare delivery related state of a replicated queue. -->
+    <control name="queue-fairshare-state" code="0x38">
+      <field name="queue" type="str8"/>
+      <field name="position" type="uint8"/>
+      <field name="count" type="uint8"/>
+    </control>
   </class>
 
 </amqp>

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1069322&r1=1069321&r2=1069322&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Thu Feb 10 10:12:41 2011
@@ -29,3 +29,5 @@ from message import *
 from query import *
 from queue import *
 from tx import *
+from lvq import *
+from priority import *

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py Thu Feb 10 10:12:41 2011
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import math
+
+class LVQTests (Base):
+    """
+    Test last value queue behaviour
+    """ 
+
+    def setup_connection(self):
+        return Connection.establish(self.broker, **self.connection_options())
+
+    def setup_session(self):
+        return self.conn.session()
+        
+    def test_simple(self):
+        snd = self.ssn.sender("lvq; {create: sender, delete: sender, node: {x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}",
+                              durable=self.durable())
+        snd.send(create_message("a", "a-1"))
+        snd.send(create_message("b", "b-1"))
+        snd.send(create_message("a", "a-2"))
+        snd.send(create_message("a", "a-3"))
+        snd.send(create_message("c", "c-1"))
+        snd.send(create_message("c", "c-2"))
+
+        rcv = self.ssn.receiver("lvq; {mode: browse}")
+        assert fetch_all(rcv) == ["b-1", "a-3", "c-2"]
+
+        snd.send(create_message("b", "b-2"))
+        assert fetch_all(rcv) == ["b-2"]
+
+        snd.send(create_message("c", "c-3"))
+        snd.send(create_message("d", "d-1"))
+        assert fetch_all(rcv) == ["c-3", "d-1"]
+
+        snd.send(create_message("b", "b-3"))
+        assert fetch_all(rcv) == ["b-3"]
+
+        rcv.close()
+        rcv = self.ssn.receiver("lvq; {mode: browse}")
+        assert (fetch_all(rcv) == ["a-3", "c-3", "d-1", "b-3"])
+
+
+def create_message(key, content):
+    msg = Message(content=content)
+    msg.properties["lvq-key"] = key
+    return msg
+
+def fetch_all(rcv):
+    content = []
+    while True:
+        try:
+            content.append(rcv.fetch(0).content)
+        except Empty:
+            break
+    return content



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message