qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1333027 [3/13] - in /qpid/branches/qpid-3767/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/bindings/qpid/ruby/lib/qpid/ cpp/docs/a...
Date Wed, 02 May 2012 13:10:03 GMT
Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Fairshare.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Fairshare.cpp Wed May  2 13:09:18 2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/Fairshare.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Wed May  2 13:09:18 2012
@@ -28,16 +28,26 @@ namespace broker {
 LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
 
 void LegacyLVQ::setNoBrowse(bool b)
-{ 
+{
     noBrowse = b;
 }
+bool LegacyLVQ::deleted(const QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(message.position);
+    if (i != messages.end() && i->second.payload == message.payload) {
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
 
 bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.payload == message.payload) {
+    if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
+        i->second.status = QueuedMessage::ACQUIRED;
         message = i->second;
-        erase(i);
         return true;
     } else {
         return false;
@@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage
 }
 
 const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{ 
+{
     //add the new message into the original position of the replaced message
     Ordering::iterator i = messages.find(original.position);
-    i->second = update;
-    i->second.position = original.position;
-    return i->second;
+    if (i != messages.end()) {
+        i->second = update;
+        i->second.position = original.position;
+        return i->second;
+    } else {
+        QPID_LOG(error, "Failed to replace message at " << original.position);
+        return update;
+    }
 }
 
 void LegacyLVQ::removeIf(Predicate p)

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LegacyLVQ.h Wed May  2 13:09:18 2012
@@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap
 {
   public:
     LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+    bool deleted(const QueuedMessage&);
     bool acquire(const framing::SequenceNumber&, QueuedMessage&);
     bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
     bool push(const QueuedMessage& added, QueuedMessage& removed);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.cpp Wed May  2 13:09:18 2012
@@ -31,6 +31,8 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/UrlArray.h"
 
 namespace qpid {
 namespace broker {
@@ -48,6 +50,13 @@ using std::stringstream;
 using std::string;
 namespace _qmf = ::qmf::org::apache::qpid::broker;
 
+
+namespace {
+    const std::string FAILOVER_EXCHANGE("amq.failover");
+    const std::string FAILOVER_HEADER_KEY("amq.failover");
+}
+
+
 struct LinkTimerTask : public sys::TimerTask {
     LinkTimerTask(Link& l, sys::Timer& t)
         : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
@@ -65,6 +74,57 @@ struct LinkTimerTask : public sys::Timer
     sys::Timer& timer;
 };
 
+
+
+/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange.
+ */
+class LinkExchange : public broker::Exchange
+{
+public:
+    LinkExchange(const std::string& name) : Exchange(name), link(0) {}
+    ~LinkExchange() {};
+    std::string getType() const { return Link::exchangeTypeName; }
+
+    // Exchange methods - set up to prevent binding/unbinding etc from clients!
+    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
+
+    // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
+    // and saving them should the Link need to reconnect.
+    void route(broker::Deliverable& msg)
+    {
+        if (!link) return;
+        const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+        framing::Array addresses;
+        if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
+            // convert the Array of addresses to a single Url container for used with setUrl():
+            std::vector<Url> urlVec;
+            Url urls;
+            urlVec = urlArrayToVector(addresses);
+            for(size_t i = 0; i < urlVec.size(); ++i)
+                urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
+            QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
+            link->setUrl(urls);
+        }
+    }
+
+    void setLink(Link *_link)
+    {
+        assert(!link);
+        link = _link;
+    }
+
+private:
+    Link *link;
+};
+
+
+boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name )
+{
+    return Exchange::shared_ptr(new LinkExchange(_name));
+}
+
 Link::Link(const string&  _name,
            LinkRegistry*  _links,
            const string&        _host,
@@ -77,8 +137,9 @@ Link::Link(const string&  _name,
            const string&        _password,
            Broker*        _broker,
            Manageable*    parent)
-    : name(_name), links(_links), host(_host), port(_port),
-      transport(_transport),
+    : name(_name), links(_links),
+      configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
+      host(_host), port(_port), transport(_transport),
       durable(_durable),
       authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -90,7 +151,8 @@ Link::Link(const string&  _name,
       connection(0),
       agent(0),
       listener(l),
-      timerTask(new LinkTimerTask(*this, broker->getTimer()))
+      timerTask(new LinkTimerTask(*this, broker->getTimer())),
+      failoverChannel(0)
 {
     if (parent != 0 && broker != 0)
     {
@@ -111,15 +173,26 @@ Link::Link(const string&  _name,
         startConnectionLH();
     }
     broker->getTimer().add(timerTask);
+
+    stringstream exchangeName;
+    exchangeName << "qpid.link." << transport << ":" << host << ":" << port;
+    std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(),
+                                                                              exchangeTypeName);
+    failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+    assert(failoverExchange);
+    failoverExchange->setLink(this);
 }
 
 Link::~Link ()
 {
-    if (state == STATE_OPERATIONAL && connection != 0)
-        connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+    if (state == STATE_OPERATIONAL && connection != 0) {
+        closeConnection("closed by management");
+    }
 
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
+
+    broker->getExchanges().destroy(failoverExchange->getName());
 }
 
 void Link::setStateLH (int newState)
@@ -186,11 +259,21 @@ void Link::established(Connection* c)
 
 
 void Link::setUrl(const Url& u) {
+    QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u);
     Mutex::ScopedLock mutex(lock);
     url = u;
     reconnectNext = 0;
 }
 
+
+namespace {
+    /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
+    void sessionDetached(Link *link) {
+        QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
+    }
+}
+
+
 void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     if (!connection) return;
@@ -209,36 +292,75 @@ void Link::opened() {
         reconnectNext = 0;
         QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
     }
+
+    //
+    // attempt to subscribe to failover exchange for updates from remote
+    //
+
+    const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+    failoverChannel = nextChannel();
+
+    SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+    sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
+    failoverSession = queueName;
+    sessionHandler.attachAs(failoverSession);
+
+    framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+    remoteBroker.getQueue().declare(queueName,
+                                    "",         // alt-exchange
+                                    false,      // passive
+                                    false,      // durable
+                                    true,       // exclusive
+                                    true,       // auto-delete
+                                    FieldTable());
+    remoteBroker.getExchange().bind(queueName,
+                                    FAILOVER_EXCHANGE,
+                                    "",     // no key
+                                    FieldTable());
+    remoteBroker.getMessage().subscribe(queueName,
+                                        failoverExchange->getName(),
+                                        1,           // implied-accept mode
+                                        0,           // pre-acquire mode
+                                        false,       // exclusive
+                                        "",          // resume-id
+                                        0,           // resume-ttl
+                                        FieldTable());
+    remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+    remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
 }
 
 void Link::closed(int, std::string text)
 {
-    Mutex::ScopedLock mutex(lock);
-    QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
+    bool isClosing = false;
+    {
+        Mutex::ScopedLock mutex(lock);
+        QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
 
-    connection = 0;
-    if (state == STATE_OPERATIONAL) {
-        stringstream addr;
-        addr << host << ":" << port;
-        if (!hideManagement() && agent)
-            agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
-    }
+        connection = 0;
+        if (state == STATE_OPERATIONAL) {
+            stringstream addr;
+            addr << host << ":" << port;
+            if (!hideManagement() && agent)
+                agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+        }
 
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        (*i)->closed();
-        created.push_back(*i);
-    }
-    active.clear();
+        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+            (*i)->closed();
+            created.push_back(*i);
+        }
+        active.clear();
 
-    if (state != STATE_FAILED && state != STATE_PASSIVE)
-    {
-        setStateLH(STATE_WAITING);
-        if (!hideManagement())
-            mgmtObject->set_lastError (text);
+        if (state != STATE_FAILED && state != STATE_PASSIVE)
+        {
+            setStateLH(STATE_WAITING);
+            if (!hideManagement())
+                mgmtObject->set_lastError (text);
+        }
             mgmtObject->set_connectionRef(qpid::management::ObjectId());
     }
-
-    if (closing)
+    // Call destroy outside of the lock, don't want to be deleted with lock held.
+    if (isClosing)
         destroy();
 }
 
@@ -249,10 +371,8 @@ void Link::destroy ()
     {
         Mutex::ScopedLock mutex(lock);
 
-        QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
-        if (connection)
-            connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-        connection = 0;
+        QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management");
+        closeConnection("closed by management");
         setStateLH(STATE_CLOSED);
 
         // Move the bridges to be deleted into a local vector so there is no
@@ -321,7 +441,7 @@ void Link::ioThreadProcessing()
     // check for bridge session errors and recover
     if (!active.empty()) {
         Bridges::iterator removed = std::remove_if(
-            active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+            active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1));
         for (Bridges::iterator i = removed; i != active.end(); ++i) {
             Bridge::shared_ptr  bridge = *i;
             bridge->closed();
@@ -411,7 +531,8 @@ bool Link::hideManagement() const {
 uint Link::nextChannel()
 {
     Mutex::ScopedLock mutex(lock);
-
+    if (channelCounter >= framing::CHANNEL_MAX)
+        channelCounter = 1;
     return channelCounter++;
 }
 
@@ -479,9 +600,9 @@ void Link::encode(Buffer& buffer) const
 {
     buffer.putShortString(ENCODED_IDENTIFIER);
     buffer.putShortString(name);
-    buffer.putShortString(host);
-    buffer.putShort(port);
-    buffer.putShortString(transport);
+    buffer.putShortString(configuredHost);
+    buffer.putShort(configuredPort);
+    buffer.putShortString(configuredTransport);
     buffer.putOctet(durable ? 1 : 0);
     buffer.putShortString(authMechanism);
     buffer.putShortString(username);
@@ -492,10 +613,9 @@ uint32_t Link::encodedSize() const
 {
     return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
         + name.size() + 1
-        + host.size() + 1 // short-string (host)
-        + 5                // short-string ("link")
+        + configuredHost.size() + 1 // short-string (host)
         + 2                // port
-        + transport.size() + 1 // short-string(transport)
+        + configuredTransport.size() + 1 // short-string(transport)
         + 1                // durable
         + authMechanism.size() + 1
         + username.size() + 1
@@ -576,4 +696,63 @@ void Link::setPassive(bool passive)
     }
 }
 
+
+/** utility to clean up connection resources correctly */
+void Link::closeConnection( const std::string& reason)
+{
+    if (connection != 0) {
+        // cancel our subscription to the failover exchange
+        SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+        if (sessionHandler.getSession()) {
+            framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+            remoteBroker.getMessage().cancel(failoverExchange->getName());
+            remoteBroker.getSession().detach(failoverSession);
+        }
+        connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
+        connection = 0;
+    }
+}
+
+/** returns the current remote's address, and connection state */
+bool Link::getRemoteAddress(qpid::Address& addr) const
+{
+    addr.protocol = transport;
+    addr.host = host;
+    addr.port = port;
+
+    return state == STATE_OPERATIONAL;
+}
+
+
+// FieldTable keys for internal state data
+namespace {
+    const std::string FAILOVER_ADDRESSES("failover-addresses");
+    const std::string FAILOVER_INDEX("failover-index");
+}
+
+void Link::getState(framing::FieldTable& state) const
+{
+    state.clear();
+    Mutex::ScopedLock mutex(lock);
+    if (!url.empty()) {
+        state.setString(FAILOVER_ADDRESSES, url.str());
+        state.setInt(FAILOVER_INDEX, reconnectNext);
+    }
+}
+
+void Link::setState(const framing::FieldTable& state)
+{
+    Mutex::ScopedLock mutex(lock);
+    if (state.isSet(FAILOVER_ADDRESSES)) {
+        Url failovers(state.getAsString(FAILOVER_ADDRESSES));
+        setUrl(failovers);
+    }
+    if (state.isSet(FAILOVER_INDEX)) {
+        reconnectNext = state.getAsInt(FAILOVER_INDEX);
+    }
+}
+
+
+const std::string Link::exchangeTypeName("qpid.LinkExchange");
+
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Link.h Wed May  2 13:09:18 2012
@@ -46,15 +46,23 @@ namespace broker {
 class LinkRegistry;
 class Broker;
 class Connection;
+class LinkExchange;
 
 class Link : public PersistableConfig, public management::Manageable {
   private:
-    sys::Mutex          lock;
+    mutable sys::Mutex  lock;
     const std::string   name;
     LinkRegistry*       links;
-    std::string        host;
-    uint16_t      port;
-    std::string        transport;
+
+    // these remain constant across failover - used to identify this link
+    const std::string   configuredTransport;
+    const std::string   configuredHost;
+    const uint16_t      configuredPort;
+    // these reflect the current address of remote - will change during failover
+    std::string         host;
+    uint16_t            port;
+    std::string         transport;
+
     bool          durable;
     std::string        authMechanism;
     std::string        username;
@@ -77,8 +85,10 @@ class Link : public PersistableConfig, p
     Connection* connection;
     management::ManagementAgent* agent;
     boost::function<void(Link*)> listener;
-
     boost::intrusive_ptr<sys::TimerTask> timerTask;
+    boost::shared_ptr<broker::LinkExchange> failoverExchange;  // subscribed to remote's amq.failover exchange
+    uint failoverChannel;
+    std::string failoverSession;
 
     static const int STATE_WAITING     = 1;
     static const int STATE_CONNECTING  = 2;
@@ -102,6 +112,8 @@ class Link : public PersistableConfig, p
     void opened();      // Called when connection is open (after create)
     void closed(int, std::string);   // Called when connection goes away
     void notifyConnectionForced(const std::string text);
+    void closeConnection(const std::string& reason);
+
     friend class LinkRegistry; // to call established, opened, closed
 
   public:
@@ -122,9 +134,16 @@ class Link : public PersistableConfig, p
          management::Manageable* parent = 0);
     virtual ~Link();
 
-    std::string getHost() { return host; }
-    uint16_t    getPort() { return port; }
-    std::string getTransport() { return transport; }
+    /** these return the *configured* transport/host/port, which does not change over the
+        lifetime of the Link */
+    std::string getHost() const { return configuredHost; }
+    uint16_t    getPort() const { return configuredPort; }
+    std::string getTransport() const { return configuredTransport; }
+
+    /** returns the current address of the remote, which may be different from the
+        configured transport/host/port due to failover. Returns true if connection is
+        active */
+    bool getRemoteAddress(qpid::Address& addr) const;
 
     bool isDurable() { return durable; }
     void maintenanceVisit ();
@@ -161,6 +180,13 @@ class Link : public PersistableConfig, p
     management::ManagementObject*    GetManagementObject(void) const;
     management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
 
+    // manage the exchange owned by this link
+    static const std::string exchangeTypeName;
+    static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
+
+    // replicate internal state of this Link for clustering
+    void getState(framing::FieldTable& state) const;
+    void setState(const framing::FieldTable& state);
 };
 }
 }

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Wed May  2 13:09:18 2012
@@ -389,22 +389,29 @@ std::string LinkRegistry::getUsername(co
     return link->getUsername();
 }
 
+/** note: returns the current remote host (may be different from the host originally
+    configured for the Link due to failover) */
 std::string LinkRegistry::getHost(const std::string& key)
 {
-    Link::shared_ptr link = findLink(key);
-    if (!link)
-        return string();
+     Link::shared_ptr link = findLink(key);
+     if (!link)
+         return string();
 
-    return link->getHost();
+     qpid::Address addr;
+     link->getRemoteAddress(addr);
+     return addr.host;
 }
 
+/** returns the current remote port (ditto above) */
 uint16_t LinkRegistry::getPort(const std::string& key)
 {
     Link::shared_ptr link = findLink(key);
     if (!link)
         return 0;
 
-    return link->getPort();
+     qpid::Address addr;
+     link->getRemoteAddress(addr);
+     return addr.port;
 }
 
 std::string LinkRegistry::getPassword(const std::string& key)

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.cpp Wed May  2 13:09:18 2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/MessageDeque.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/log/Statement.h"
+#include "assert.h"
 
 namespace qpid {
 namespace broker {
@@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing
 bool MessageDeque::deleted(const QueuedMessage& m)
 {
     size_t i = index(m.position);
-    if (i < messages.size()) {
+    if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
         messages[i].status = QueuedMessage::DELETED;
         clean();
         return true;
@@ -53,7 +54,7 @@ size_t MessageDeque::size()
     return available;
 }
 
-void MessageDeque::release(const QueuedMessage& message)
+QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
 {
     size_t i = index(message.position);
     if (i < messages.size()) {
@@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedM
             if (head > i) head = i;
             m.status = QueuedMessage::AVAILABLE;
             ++available;
+            return &messages[i];
         }
     } else {
+        assert(0);
         QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
     }
+    return 0;
 }
 
+void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
+
 bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     if (position < messages.front().position) return false;
@@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) {
 }
 } // namespace
 
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
+QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
     //add padding to prevent gaps in sequence, which break the index
     //calculation (needed for queue replication)
     while (messages.size() && (added.position - messages.back().position) > 1)
@@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMess
     messages.back().status = QueuedMessage::AVAILABLE;
     if (head >= messages.size()) head = messages.size() - 1;
     ++available;
-    return false;//adding a message never causes one to be removed for deque
+    return &messages.back();
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
+    pushPtr(added);
+    return false; // adding a message never causes one to be removed for deque
 }
 
 void MessageDeque::updateAcquired(const QueuedMessage& acquired)

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageDeque.h Wed May  2 13:09:18 2012
@@ -48,6 +48,12 @@ class MessageDeque : public Messages
     void foreach(Functor);
     void removeIf(Predicate);
 
+    // For use by other Messages implementations that use MessageDeque as a FIFO index
+    // and keep pointers to its elements in their own indexing strctures.
+    void clean();
+    QueuedMessage* releasePtr(const QueuedMessage&);
+    QueuedMessage* pushPtr(const QueuedMessage& added);
+
   private:
     typedef std::deque<QueuedMessage> Deque;
     Deque messages;
@@ -55,7 +61,6 @@ class MessageDeque : public Messages
     size_t head;
 
     size_t index(const framing::SequenceNumber&);
-    void clean();
 };
 }} // namespace qpid::broker
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Wed May  2 13:09:18 2012
@@ -19,11 +19,13 @@
  *
  */
 
+#include "qpid/broker/MessageGroupManager.h"
+
+#include "qpid/broker/Queue.h"
 #include "qpid/framing/FieldTable.h"
-#include "qpid/types/Variant.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/MessageGroupManager.h"
+#include "qpid/types/Variant.h"
 
 using namespace qpid::broker;
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageGroupManager.h Wed May  2 13:09:18 2012
@@ -28,11 +28,14 @@
 #include "qpid/broker/MessageDistributor.h"
 #include "qpid/sys/unordered_map.h"
 
+#include <deque>
+
 namespace qpid {
 namespace broker {
 
 class QueueObserver;
 class MessageDistributor;
+class Messages;
 
 class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
 {

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.cpp Wed May  2 13:09:18 2012
@@ -20,6 +20,7 @@
  */
 #include "qpid/broker/MessageMap.h"
 #include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
@@ -27,7 +28,16 @@ namespace {
 const std::string EMPTY;
 }
 
-bool MessageMap::deleted(const QueuedMessage&) { return true; }
+bool MessageMap::deleted(const QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(message.position);
+    if (i != messages.end()) {
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
 
 std::string MessageMap::getKey(const QueuedMessage& message)
 {
@@ -38,30 +48,32 @@ std::string MessageMap::getKey(const Que
 
 size_t MessageMap::size()
 {
-    return messages.size();
+    size_t count(0);
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+    }
+    return count;
 }
 
 bool MessageMap::empty()
 {
-    return messages.empty();
+    return size() == 0;//TODO: more efficient implementation
 }
 
 void MessageMap::release(const QueuedMessage& message)
 {
-    std::string key = getKey(message);
-    Index::iterator i = index.find(key);
-    if (i == index.end()) {
-        index[key] = message;
-        messages[message.position] = message;
-    } //else message has already been replaced
+    Ordering::iterator i = messages.find(message.position);
+    if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+        i->second.status = QueuedMessage::AVAILABLE;
+    }
 }
 
 bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end()) {
+    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+        i->second.status = QueuedMessage::ACQUIRED;
         message = i->second;
-        erase(i);
         return true;
     } else {
         return false;
@@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::
 bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end()) {
+    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
         message = i->second;
         return true;
     } else {
@@ -79,10 +91,10 @@ bool MessageMap::find(const framing::Seq
     }
 }
 
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
 {
     Ordering::iterator i = messages.lower_bound(position+1);
-    if (i != messages.end()) {
+    if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE  || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
         message = i->second;
         return true;
     } else {
@@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::S
 
 bool MessageMap::consume(QueuedMessage& message)
 {
-    Ordering::iterator i = messages.begin();
-    if (i != messages.end()) {
-        message = i->second;
-        erase(i);
-        return true;
-    } else {
-        return false;
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->second.status == QueuedMessage::AVAILABLE) {
+            i->second.status = QueuedMessage::ACQUIRED;
+            message = i->second;
+            return true;
+        }
     }
+    return false;
 }
 
 const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
@@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessag
     if (result.second) {
         //there was no previous message for this key; nothing needs to
         //be removed, just add the message into its correct position
-        messages[added.position] = added;
+        QueuedMessage& a = messages[added.position];
+        a = added;
+        a.status = QueuedMessage::AVAILABLE;
+        QPID_LOG(debug, "Added message at " << a.position);
         return false;
     } else {
         //there is already a message with that key which needs to be replaced
         removed = result.first->second;
         result.first->second = replace(result.first->second, added);
+        result.first->second.status = QueuedMessage::AVAILABLE;
+        QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
         return true;
     }
 }
@@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessag
 void MessageMap::foreach(Functor f)
 {
     for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        f(i->second);
+        if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
     }
 }
 
 void MessageMap::removeIf(Predicate p)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
-        if (p(i->second)) {
-            erase(i);
+    for (Ordering::iterator i = messages.begin(); i != messages.end();) {
+        if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
+            index.erase(getKey(i->second));
+            //Note: Removing from messages means that the subsequent
+            //call to deleted() for the same message will return
+            //false. At present that is not a problem. If this were
+            //changed to hold onto the message until dequeued
+            //(e.g. with REMOVED state), then the erase() below would
+            //need to take that into account.
+            messages.erase(i++);
+        } else {
+            ++i;
         }
     }
 }

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/MessageMap.h Wed May  2 13:09:18 2012
@@ -43,7 +43,7 @@ class MessageMap : public Messages
     size_t size();
     bool empty();
 
-    bool deleted(const QueuedMessage&);
+    virtual bool deleted(const QueuedMessage&);
     void release(const QueuedMessage&);
     virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Wed May  2 13:09:18 2012
@@ -3,13 +3,13 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownersip.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,96 +22,87 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
 #include <cmath>
 
 namespace qpid {
 namespace broker {
 
-PriorityQueue::PriorityQueue(int l) : 
+PriorityQueue::PriorityQueue(int l) :
     levels(l),
     messages(levels, Deque()),
     frontLevel(0), haveFront(false), cached(false) {}
 
-bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
+bool PriorityQueue::deleted(const QueuedMessage& qm) {
+    bool deleted = fifo.deleted(qm);
+    if (deleted) erase(qm);
+    return deleted;
+}
 
 size_t PriorityQueue::size()
 {
-    size_t total(0);
-    for (int i = 0; i < levels; ++i) {
-        total += messages[i].size();
-    }
-    return total;
+    return fifo.size();
+}
+
+namespace {
+bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
 }
 
 void PriorityQueue::release(const QueuedMessage& message)
 {
-    uint p = getPriorityLevel(message);
-    messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
-    clearCache();
+    QueuedMessage* qm = fifo.releasePtr(message);
+    if (qm) {
+        uint p = getPriorityLevel(message);
+        messages[p].insert(
+            lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
+        clearCache();
+    }
 }
 
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
-{
-    QueuedMessage comp;
-    comp.position = position;
-    for (int i = 0; i < levels; ++i) {
-        if (!messages[i].empty()) {
-            unsigned long diff = position.getValue() - messages[i].front().position.getValue();
-            long maxEnd = diff < messages[i].size() ? diff : messages[i].size();        
-            Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
-            if (l != messages[i].end() && l->position == position) {
-                message = *l;
-                if (remove) {
-                    messages[i].erase(l);
-                    clearCache();
-                }
-                return true;
-            }
+
+void PriorityQueue::erase(const QueuedMessage& qm) {
+    size_t i = getPriorityLevel(qm);
+    if (!messages[i].empty()) {
+        long diff = qm.position.getValue() - messages[i].front()->position.getValue();
+        if (diff < 0) return;
+        long maxEnd = std::min(size_t(diff), messages[i].size());
+        QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
+        Deque::iterator l =
+            lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
+        if (l != messages[i].end() && (*l)->position == qm.position) {
+            messages[i].erase(l);
+            clearCache();
+            return;
         }
     }
-    return false;
 }
 
 bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
-    return find(position, message, true);
+    bool acquired = fifo.acquire(position, message);
+    if (acquired) erase(message); // No longer available
+    return acquired;
 }
 
 bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
 {
-    return find(position, message, false);
+    return fifo.find(position, message);
 }
 
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool PriorityQueue::browse(
+    const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
 {
-    QueuedMessage match;
-    match.position = position+1;
-    Deque::iterator lowest;
-    bool found = false;
-    for (int i = 0; i < levels; ++i) {
-        Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); 
-        if (m != messages[i].end()) {
-            if (m->position == match.position) {
-                message = *m;
-                return true;
-            } else if (!found || m->position < lowest->position) {
-                lowest = m;
-                found = true;
-            }
-        }
-    }
-    if (found) {
-        message = *lowest;
-    }
-    return found;
+    return fifo.browse(position, message, unacquired);
 }
 
 bool PriorityQueue::consume(QueuedMessage& message)
 {
     if (checkFront()) {
-        message = messages[frontLevel].front();
+        QueuedMessage* pm = messages[frontLevel].front();
         messages[frontLevel].pop_front();
         clearCache();
+        pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
+        message = *pm;
         return true;
     } else {
         return false;
@@ -120,23 +111,27 @@ bool PriorityQueue::consume(QueuedMessag
 
 bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
 {
-    messages[getPriorityLevel(added)].push_back(added);
+    QueuedMessage* qmp = fifo.pushPtr(added);
+    messages[getPriorityLevel(added)].push_back(qmp);
     clearCache();
-    return false;//adding a message never causes one to be removed for deque
+    return false; // Adding a message never causes one to be removed for deque
+}
+
+void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
+    fifo.updateAcquired(acquired);
 }
 
 void PriorityQueue::foreach(Functor f)
 {
-    for (int i = 0; i < levels; ++i) {
-        std::for_each(messages[i].begin(), messages[i].end(), f);
-    }
+    fifo.foreach(f);
 }
 
 void PriorityQueue::removeIf(Predicate p)
 {
     for (int priority = 0; priority < levels; ++priority) {
         for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
-            if (p(*i)) {
+            if (p(**i)) {
+                (*i)->status = QueuedMessage::DELETED; // Updates fifo index
                 i = messages[priority].erase(i);
                 clearCache();
             } else {
@@ -144,6 +139,7 @@ void PriorityQueue::removeIf(Predicate p
             }
         }
     }
+    fifo.clean();
 }
 
 uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/PriorityQueue.h Wed May  2 13:09:18 2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,7 @@
  * under the License.
  *
  */
-#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
 #include "qpid/sys/IntegerTypes.h"
 #include <deque>
 #include <vector>
@@ -32,7 +32,10 @@ namespace broker {
 /**
  * Basic priority queue with a configurable number of recognised
  * priority levels. This is implemented as a separate deque per
- * priority level. Browsing is FIFO not priority order.
+ * priority level.
+ *
+ * Browsing is FIFO not priority order. There is a MessageDeque
+ * for fast browsing.
  */
 class PriorityQueue : public Messages
 {
@@ -48,23 +51,31 @@ class PriorityQueue : public Messages
     bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
     bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
-
+    void updateAcquired(const QueuedMessage& acquired);
     void foreach(Functor);
     void removeIf(Predicate);
+
     static uint getPriority(const QueuedMessage&);
+
   protected:
-    typedef std::deque<QueuedMessage> Deque;
+    typedef std::deque<QueuedMessage*> Deque;
     typedef std::vector<Deque> PriorityLevels;
     virtual bool findFrontLevel(uint& p, PriorityLevels&);
 
     const int levels;
+
   private:
+    /** Available messages separated by priority and sorted in priority order.
+     *  Holds pointers to the QueuedMessages in fifo
+     */
     PriorityLevels messages;
+    /** FIFO index of all messsagse (including acquired messages)  for fast browsing and indexing */
+    MessageDeque fifo;
     uint frontLevel;
     bool haveFront;
     bool cached;
-    
-    bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+
+    void erase(const QueuedMessage&);
     uint getPriorityLevel(const QueuedMessage&) const;
     void clearCache();
     bool checkFront();

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.cpp Wed May  2 13:09:18 2012
@@ -19,8 +19,9 @@
  *
  */
 
-#include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
+
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Fairshare.h"
@@ -41,6 +42,7 @@
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/sys/ClusterSafe.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
@@ -152,6 +154,7 @@ Queue::Queue(const string& _name, bool _
     store(_store),
     owner(_owner),
     consumerCount(0),
+    browserCount(0),
     exclusive(0),
     noLocal(false),
     persistLastNode(false),
@@ -521,17 +524,27 @@ void Queue::consume(Consumer::shared_ptr
     assertClusterSafe();
     {
         Mutex::ScopedLock locker(messageLock);
-        if(exclusive) {
-            throw ResourceLockedException(
-                                          QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-        } else if(requestExclusive) {
-            if(consumerCount) {
+        // NOTE: consumerCount is actually a count of all
+        // subscriptions, both acquiring and non-acquiring (browsers).
+        // Check for exclusivity of acquiring consumers.
+        size_t acquiringConsumers = consumerCount - browserCount;
+        if (c->preAcquires()) {
+            if(exclusive) {
                 throw ResourceLockedException(
-                                              QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-            } else {
-                exclusive = c->getSession();
+                    QPID_MSG("Queue " << getName()
+                             << " has an exclusive consumer. No more consumers allowed."));
+            } else if(requestExclusive) {
+                if(acquiringConsumers) {
+                    throw ResourceLockedException(
+                        QPID_MSG("Queue " << getName()
+                                 << " already has consumers. Exclusive access denied."));
+                } else {
+                    exclusive = c->getSession();
+                }
             }
         }
+        else
+            browserCount++;
         consumerCount++;
         //reset auto deletion timer if necessary
         if (autoDeleteTimeout && autoDeleteTask) {
@@ -548,6 +561,7 @@ void Queue::cancel(Consumer::shared_ptr 
     {
         Mutex::ScopedLock locker(messageLock);
         consumerCount--;
+        if (!c->preAcquires()) browserCount--;
         if(exclusive) exclusive = 0;
         observeConsumerRemove(*c, locker);
     }
@@ -1611,9 +1625,14 @@ Manageable::status_t Queue::ManagementMe
         {
             _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
             boost::shared_ptr<Exchange> dest;
-            if (rerouteArgs.i_useAltExchange)
+            if (rerouteArgs.i_useAltExchange) {
+                if (!alternateExchange) {
+                    status = Manageable::STATUS_PARAMETER_INVALID;
+                    etext = "No alternate-exchange defined";
+                    break;
+                }
                 dest = alternateExchange;
-            else {
+            } else {
                 try {
                     dest = broker->getExchanges().get(rerouteArgs.i_exchange);
                 } catch(const std::exception&) {

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/Queue.h Wed May  2 13:09:18 2012
@@ -97,7 +97,8 @@ class Queue : public boost::enable_share
     const bool autodelete;
     MessageStore* store;
     const OwnershipToken* owner;
-    uint32_t consumerCount;
+    uint32_t consumerCount;     // Actually a count of all subscriptions, acquiring or not.
+    uint32_t browserCount;      // Count of non-acquiring subscriptions.
     OwnershipToken* exclusive;
     bool noLocal;
     bool persistLastNode;

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/QueuedMessage.h Wed May  2 13:09:18 2012
@@ -22,6 +22,8 @@
 #define _QueuedMessage_
 
 #include "qpid/broker/Message.h"
+#include "BrokerImportExport.h"
+#include <iosfwd>
 
 namespace qpid {
 namespace broker {
@@ -47,6 +49,7 @@ inline bool operator<(const QueuedMessag
     return a.position < b.position;
 }
 
+QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueuedMessage&);
 }}
 
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Wed May  2 13:09:18 2012
@@ -26,6 +26,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/sys/SecuritySettings.h"
 #include <boost/format.hpp>
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed May  2 13:09:18 2012
@@ -21,8 +21,9 @@
 #include "qpid/Exception.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/enum.h"
-#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/framing/SequenceSet.h"
+#include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/broker/SessionState.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -73,18 +74,12 @@ void SessionAdapter::ExchangeHandlerImpl
     if(passive){
         AclModule* acl = getBroker().getAcl();
         if (acl) {
-            //TODO: why does a passive declare require create
-            //permission? The purpose of the passive flag is to state
-            //that the exchange should *not* created. For
-            //authorisation a passive declare is similar to
-            //exchange-query.
             std::map<acl::Property, std::string> params;
             params.insert(make_pair(acl::PROP_TYPE, type));
             params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
             params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
-            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
-                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchange,&params) )
+                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange access request from " << getConnection().getUserId()));
         }
         Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
@@ -274,22 +269,16 @@ void SessionAdapter::QueueHandlerImpl::d
     if (passive && !name.empty()) {
         AclModule* acl = getBroker().getAcl();
         if (acl) {
-            //TODO: why does a passive declare require create
-            //permission? The purpose of the passive flag is to state
-            //that the queue should *not* created. For
-            //authorisation a passive declare is similar to
-            //queue-query (or indeed a qmf query).
             std::map<acl::Property, std::string> params;
             params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
             params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
             params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
             params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
             params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
             params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
             params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
-                throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,&params) )
+                throw UnauthorizedAccessException(QPID_MSG("ACL denied queue access request from " << getConnection().getUserId()));
         }
         queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
@@ -409,6 +398,7 @@ SessionAdapter::MessageHandlerImpl::subs
     if(!destination.empty() && state.exists(destination))
         throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
 
+    // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA.
     if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0)
         throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
                                                << queue->getName()));
@@ -548,13 +538,6 @@ void SessionAdapter::TxHandlerImpl::roll
     state.rollback();
 }
 
-std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
-{
-    std::string encoded;
-    encode(xid, encoded);
-    return encoded;
-}
-
 void SessionAdapter::DtxHandlerImpl::select()
 {
     state.selectDtx();
@@ -566,7 +549,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
 {
     try {
         if (fail) {
-            state.endDtx(convert(xid), true);
+            state.endDtx(DtxManager::convert(xid), true);
             if (suspend) {
                 throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
             } else {
@@ -574,9 +557,9 @@ XaResult SessionAdapter::DtxHandlerImpl:
             }
         } else {
             if (suspend) {
-                state.suspendDtx(convert(xid));
+                state.suspendDtx(DtxManager::convert(xid));
             } else {
-                state.endDtx(convert(xid), false);
+                state.endDtx(DtxManager::convert(xid), false);
             }
             return XaResult(XA_STATUS_XA_OK);
         }
@@ -594,9 +577,9 @@ XaResult SessionAdapter::DtxHandlerImpl:
     }
     try {
         if (resume) {
-            state.resumeDtx(convert(xid));
+            state.resumeDtx(DtxManager::convert(xid));
         } else {
-            state.startDtx(convert(xid), getBroker().getDtxManager(), join);
+            state.startDtx(DtxManager::convert(xid), getBroker().getDtxManager(), join);
         }
         return XaResult(XA_STATUS_XA_OK);
     } catch (const DtxTimeoutException& /*e*/) {
@@ -607,7 +590,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
 XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
 {
     try {
-        bool ok = getBroker().getDtxManager().prepare(convert(xid));
+        bool ok = getBroker().getDtxManager().prepare(DtxManager::convert(xid));
         return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
     } catch (const DtxTimeoutException& /*e*/) {
         return XaResult(XA_STATUS_XA_RBTIMEOUT);
@@ -618,7 +601,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
                             bool onePhase)
 {
     try {
-        bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
+        bool ok = getBroker().getDtxManager().commit(DtxManager::convert(xid), onePhase);
         return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
     } catch (const DtxTimeoutException& /*e*/) {
         return XaResult(XA_STATUS_XA_RBTIMEOUT);
@@ -629,7 +612,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
 XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
 {
     try {
-        getBroker().getDtxManager().rollback(convert(xid));
+        getBroker().getDtxManager().rollback(DtxManager::convert(xid));
         return XaResult(XA_STATUS_XA_OK);
     } catch (const DtxTimeoutException& /*e*/) {
         return XaResult(XA_STATUS_XA_RBTIMEOUT);
@@ -659,7 +642,7 @@ void SessionAdapter::DtxHandlerImpl::for
 
 DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
 {
-    uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
+    uint32_t timeout = getBroker().getDtxManager().getTimeout(DtxManager::convert(xid));
     return DtxGetTimeoutResult(timeout);
 }
 
@@ -667,7 +650,7 @@ DtxGetTimeoutResult SessionAdapter::DtxH
 void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
                                                 uint32_t timeout)
 {
-    getBroker().getDtxManager().setTimeout(convert(xid), timeout);
+    getBroker().getDtxManager().setTimeout(DtxManager::convert(xid), timeout);
 }
 
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionAdapter.h Wed May  2 13:09:18 2012
@@ -226,10 +226,8 @@ class Queue;
         void rollback();
     };
 
-    class DtxHandlerImpl : public DtxHandler, public HandlerHelper, private framing::StructHelper
+    class DtxHandlerImpl : public DtxHandler, public HandlerHelper
     {
-        std::string convert(const framing::Xid& xid);
-
       public:
         DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed May  2 13:09:18 2012
@@ -64,6 +64,7 @@ void SessionHandler::handleDetach() {
     if (session.get())
         connection.getBroker().getSessionManager().detach(session);
     assert(!session.get());
+    if (detachedCallback) detachedCallback();
     connection.closeChannel(channel.get());
 }
 
@@ -117,4 +118,8 @@ void SessionHandler::attached(const std:
     }
 }
 
+void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
+    detachedCallback = cb;
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/SessionHandler.h Wed May  2 13:09:18 2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,6 +25,7 @@
 #include "qpid/amqp_0_10/SessionHandler.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include <boost/function.hpp>
 
 namespace qpid {
 class SessionState;
@@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10:
      * This proxy is for sending such commands. In a clustered broker it will take steps
      * to synchronize command order across the cluster. In a stand-alone broker
      * it is just a synonym for getProxy()
-     */  
+     */
     framing::AMQP_ClientProxy& getClusterOrderProxy() {
         return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
     }
@@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10:
     void attached(const std::string& name);//used by 'pushing' inter-broker bridges
     void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
 
+    void setDetachedCallback(boost::function<void()> cb);
+
   protected:
     virtual void setState(const std::string& sessionName, bool force);
     virtual qpid::SessionState* getState();
@@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10:
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
     std::auto_ptr<SetChannelProxy> clusterOrderProxy;
+    boost::function<void ()> detachedCallback;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp Wed May  2 13:09:18 2012
@@ -25,6 +25,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
 
 #include <windows.h>
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Wed May  2 13:09:18 2012
@@ -28,10 +28,13 @@
 #include "qpid/framing/all_method_bodies.h"
 #include "qpid/framing/ClientInvoker.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Helpers.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/SystemInfo.h"
 
+#include <algorithm>
+
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::framing::connection;
@@ -238,15 +241,16 @@ void ConnectionHandler::start(const Fiel
                                             );
 
     std::vector<std::string> mechlist;
+    mechlist.reserve(mechanisms.size());
     if (mechanism.empty()) {
         //mechlist is simply what the server offers
-        mechanisms.collect(mechlist);
+        std::transform(mechanisms.begin(), mechanisms.end(), std::back_inserter(mechlist), Array::get<std::string, Array::ValuePtr>);
     } else {
         //mechlist is the intersection of those indicated by user and
         //those supported by server, in the order listed by user
         std::vector<std::string> allowed = split(mechanism, " ");
-        std::vector<std::string> supported;
-        mechanisms.collect(supported);
+        std::vector<std::string> supported(mechanisms.size());
+        std::transform(mechanisms.begin(), mechanisms.end(), std::back_inserter(supported), Array::get<std::string, Array::ValuePtr>);
         intersection(allowed, supported, mechlist);
         if (mechlist.empty()) {
             throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")"));

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed May  2 13:09:18 2012
@@ -115,8 +115,10 @@ public:
         ioThreads(0),
         connections(0)
     {
+        CommonOptions common("", "", QPIDC_CONF_FILE);
         IOThreadOptions options(c);
-        options.parse(0, 0, QPIDC_CONF_FILE, true);
+        common.parse(0, 0, common.clientConfig, true);
+        options.parse(0, 0, common.clientConfig, true);
         maxIOThreads = (options.maxIOThreads != -1) ?
             options.maxIOThreads : 1;
     }

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/LoadPlugins.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/LoadPlugins.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/LoadPlugins.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/LoadPlugins.cpp Wed May  2 13:09:18 2012
@@ -39,10 +39,12 @@ namespace {
 
 struct LoadtimeInitialise {
     LoadtimeInitialise() {
+        CommonOptions common("", "", QPIDC_CONF_FILE);
         qpid::ModuleOptions moduleOptions(QPIDC_MODULE_DIR);
         string              defaultPath (moduleOptions.loadDir);
-        moduleOptions.parse (0, 0, QPIDC_CONF_FILE, true);
-    
+        common.parse(0, 0, common.clientConfig, true);
+        moduleOptions.parse (0, 0, common.clientConfig, true);
+
         for (vector<string>::iterator iter = moduleOptions.load.begin();
              iter != moduleOptions.load.end();
              iter++)

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/SslConnector.cpp Wed May  2 13:09:18 2012
@@ -148,8 +148,10 @@ namespace {
     struct StaticInit {
         StaticInit() {
             try {
+                CommonOptions common("", "", QPIDC_CONF_FILE);
                 SslOptions options;
-                options.parse (0, 0, QPIDC_CONF_FILE, true);
+                common.parse(0, 0, common.clientConfig, true);
+                options.parse (0, 0, common.clientConfig, true);
                 if (options.certDbPath.empty()) {
                     QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");
                 } else {

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Wed May  2 13:09:18 2012
@@ -32,6 +32,7 @@
 #include "qpid/framing/ExchangeBoundResult.h"
 #include "qpid/framing/ExchangeQueryResult.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/framing/QueueQueryResult.h"
 #include "qpid/framing/ReplyTo.h"
 #include "qpid/framing/reply_exceptions.h"

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed May  2 13:09:18 2012
@@ -131,6 +131,7 @@
 #include "qpid/cluster/UpdateExchange.h"
 #include "qpid/cluster/ClusterTimer.h"
 #include "qpid/cluster/CredentialsExchange.h"
+#include "qpid/cluster/UpdateClient.h"
 
 #include "qpid/assert.h"
 #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -202,7 +203,7 @@ namespace arg=client::arg;
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1207877;
+const uint32_t Cluster::CLUSTER_VERSION = 1332342;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -269,7 +270,6 @@ Cluster::Cluster(const ClusterSettings& 
                       "Error delivering frames",
                       poller),
     failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
-    updateDataExchange(new UpdateDataExchange(*this)),
     credentialsExchange(new CredentialsExchange(*this)),
     quorum(boost::bind(&Cluster::leave, this)),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
@@ -295,15 +295,6 @@ Cluster::Cluster(const ClusterSettings& 
     // Failover exchange provides membership updates to clients.
     broker.getExchanges().registerExchange(failoverExchange);
 
-    // Update exchange is used during updates to replicate messages
-    // without modifying delivery-properties.exchange.
-    broker.getExchanges().registerExchange(
-        boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
-
-    // Update-data exchange is used for passing data that may be too large
-    // for single control frame.
-    broker.getExchanges().registerExchange(updateDataExchange);
-
     // CredentialsExchange is used to authenticate new cluster members
     broker.getExchanges().registerExchange(credentialsExchange);
 
@@ -680,6 +671,17 @@ void Cluster::initMapCompleted(Lock& l) 
             authenticate();
             broker.setRecovery(false); // Ditch my current store.
             broker.setClusterUpdatee(true);
+
+            // Update exchange is used during updates to replicate messages
+            // without modifying delivery-properties.exchange.
+            broker.getExchanges().registerExchange(
+                boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
+            // Update-data exchange is used during update for passing data that
+            // may be too large for single control frame.
+            updateDataExchange.reset(new UpdateDataExchange(*this));
+            broker.getExchanges().registerExchange(updateDataExchange);
+
             if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
             state = JOINER;
             mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
@@ -999,6 +1001,10 @@ void Cluster::checkUpdateIn(Lock& l) {
                         boost::ref(broker.getExchanges())));
         enableClusterSafe();    // Enable cluster-safe assertions
         deliverEventQueue.start();
+        // FIXME aconway 2012-04-04: unregister/delete Update[Data]Exchange
+        updateDataExchange.reset();
+        broker.getExchanges().destroy(UpdateDataExchange::EXCHANGE_NAME);
+        broker.getExchanges().destroy(UpdateClient::UPDATE);
     }
     else if (updateRetracted) { // Update was retracted, request another update
         updateRetracted = false;

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Wed May  2 13:09:18 2012
@@ -21,6 +21,7 @@
 #include "qpid/cluster/ClusterMap.h"
 #include "qpid/Url.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
 #include <algorithm>

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.cpp Wed May  2 13:09:18 2012
@@ -47,6 +47,7 @@
 #include "qpid/framing/ClusterConnectionAnnounceBody.h"
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/ClusterSafe.h"
 #include "qpid/types/Variant.h"
@@ -798,6 +799,54 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
 }
 
+namespace {
+    // find a Link that matches the given Address
+    class LinkFinder {
+        qpid::Address id;
+        boost::shared_ptr<broker::Link> link;
+    public:
+        LinkFinder(const qpid::Address& _id) : id(_id) {}
+        boost::shared_ptr<broker::Link> getLink() { return link; }
+        void operator() (boost::shared_ptr<broker::Link> l)
+        {
+            if (!link) {
+                qpid::Address addr(l->getTransport(), l->getHost(), l->getPort());
+                if (id == addr) {
+                    link = l;
+                }
+            }
+        }
+    };
+}
+
+void Connection::internalState(const std::string& type,
+                               const std::string& name,
+                               const framing::FieldTable& state)
+{
+    if (type == "link") {
+        // name is the string representation of the Link's _configured_ destination address
+        Url dest;
+        try {
+            dest = name;
+        } catch(...) {
+            throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name));
+        }
+        assert(dest.size());
+        LinkFinder finder(dest[0]);
+        cluster.getBroker().getLinks().eachLink(boost::ref(finder));
+        if (finder.getLink()) {
+            try {
+                finder.getLink()->setState(state);
+            } catch(...) {
+                throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state));
+            }
+            QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state);
+        } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name));
+    }
+    else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type));
+}
+
+
 void Connection::doCatchupIoCallbacks() {
     // We need to process IO callbacks during the catch-up phase in
     // order to service asynchronous completions for messages

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/Connection.h Wed May  2 13:09:18 2012
@@ -200,6 +200,8 @@ class Connection :
                               const std::string& instance);
 
     void config(const std::string& encoded);
+    void internalState(const std::string& type, const std::string& name,
+                       const framing::FieldTable& state);
 
     void setSecureConnection ( broker::SecureConnection * sc );
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed May  2 13:09:18 2012
@@ -57,6 +57,7 @@
 #include "qpid/framing/ClusterConnectionShadowReadyBody.h"
 #include "qpid/framing/ClusterConnectionSessionStateBody.h"
 #include "qpid/framing/ClusterConnectionConsumerStateBody.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/TypeCode.h"
@@ -687,7 +688,15 @@ void UpdateClient::updateLinks() {
 void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
     QPID_LOG(debug, *this << " updating link "
              << link->getHost() << ":" << link->getPort());
-    ClusterConnectionProxy(session).config(encode(*link));
+    ClusterConnectionProxy(session).config(encode(*link));  // push the configuration
+    // now push the current state
+    framing::FieldTable state;
+    link->getState(state);
+    std::ostringstream os;
+    os << qpid::Address(link->getTransport(), link->getHost(), link->getPort());
+    ClusterConnectionProxy(session).internalState(std::string("link"),
+                                                  os.str(),
+                                                  state);
 }
 
 void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {

Propchange: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:r1306564-1332660

Propchange: qpid/branches/qpid-3767/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FieldTable.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FieldTable.cpp Wed May  2 13:09:18 2012
@@ -69,7 +69,28 @@ FieldTable::FieldTable(const FieldTable&
         newBytes = true;
         return;
     }
-    if (!ft.values.empty()) values = ft.values;
+    // In practice Encoding the source field table and only copying
+    // the encoded bytes is faster than copying the whole value map.
+    // (Because we nearly always copy a field table internally before
+    // encoding it to send, but don't change it after the copy)
+    if (!ft.values.empty()) {
+        // Side effect of getting encoded size will cache it in ft.cachedSize
+        ft.cachedBytes = boost::shared_array<uint8_t>(new uint8_t[ft.encodedSize()]);
+
+        Buffer buffer((char*)&ft.cachedBytes[0], ft.cachedSize);
+
+        // Cut and paste ahead...
+        buffer.putLong(ft.encodedSize() - 4);
+        buffer.putLong(ft.values.size());
+        for (ValueMap::const_iterator i = ft.values.begin(); i!=ft.values.end(); ++i) {
+            buffer.putShortString(i->first);
+            i->second->encode(buffer);
+        }
+
+        cachedBytes = ft.cachedBytes;
+        cachedSize = ft.cachedSize;
+        newBytes = true;
+    }
 }
 
 FieldTable& FieldTable::operator=(const FieldTable& ft)
@@ -254,24 +275,18 @@ bool FieldTable::getDouble(const std::st
 //}
 
 void FieldTable::encode(Buffer& buffer) const {
-    ScopedLock<Mutex> l(lock);
     // If we've still got the input field table
     // we can just copy it directly to the output
     if (cachedBytes) {
+        ScopedLock<Mutex> l(lock);
         buffer.putRawData(&cachedBytes[0], cachedSize);
     } else {
-        uint32_t p = buffer.getPosition();
         buffer.putLong(encodedSize() - 4);
         buffer.putLong(values.size());
         for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
             buffer.putShortString(i->first);
             i->second->encode(buffer);
         }
-        // Now create raw bytes in case we are used again
-        cachedSize = buffer.getPosition() - p;
-        cachedBytes = boost::shared_array<uint8_t>(new uint8_t[cachedSize]);
-        buffer.setPosition(p);
-        buffer.getRawData(&cachedBytes[0], cachedSize);
     }
 }
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.cpp Wed May  2 13:09:18 2012
@@ -102,3 +102,7 @@ std::string FrameSet::getContent() const
     getContent(out);
     return out;
 }
+
+bool FrameSet::hasContent() const {
+    return parts.size() >= 3;
+}

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/framing/FrameSet.h Wed May  2 13:09:18 2012
@@ -54,6 +54,7 @@ public:
 
     QPID_COMMON_EXTERN void getContent(std::string&) const;
     QPID_COMMON_EXTERN std::string getContent() const;
+    QPID_COMMON_EXTERN bool hasContent() const;
 
     bool isContentBearing() const;
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.cpp Wed May  2 13:09:18 2012
@@ -19,10 +19,11 @@
  *
  */
 #include "Backup.h"
-#include "Settings.h"
 #include "BrokerReplicator.h"
-#include "ReplicatingSubscription.h"
 #include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
@@ -43,16 +44,18 @@ using namespace broker;
 using types::Variant;
 using std::string;
 
-Backup::Backup(broker::Broker& b, const Settings& s) :
-    broker(b), settings(s), excluder(new ConnectionExcluder())
+Backup::Backup(HaBroker& hb, const Settings& s) :
+    haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
 {
+    // Exclude client connections before starting the link to avoid self-connection.
+    broker.getConnectionObservers().add(excluder);
     // Empty brokerUrl means delay initialization until setUrl() is called.
     if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
 }
 
 void Backup::initialize(const Url& url) {
-    assert(!url.empty());
-    QPID_LOG(notice, "HA: Backup started: " << url);
+    if (url.empty()) throw Url::Invalid("HA broker URL is empty");
+    QPID_LOG(notice, "HA: Backup initialized: " << url);
     string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
     framing::Uuid uuid(true);
     // Declare the link
@@ -61,21 +64,26 @@ void Backup::initialize(const Url& url) 
         url[0].host, url[0].port, protocol,
         false,              // durable
         settings.mechanism, settings.username, settings.password);
-    assert(result.second);  // FIXME aconway 2011-11-23: error handling
     link = result.first;
     link->setUrl(url);
-
-    replicator.reset(new BrokerReplicator(link));
+    replicator.reset(new BrokerReplicator(haBroker, link));
     broker.getExchanges().registerExchange(replicator);
-    broker.getConnectionObservers().add(excluder);
 }
 
+Backup::~Backup() {
+    if (link) link->close();
+    if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+    replicator.reset();
+    broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+
 void Backup::setBrokerUrl(const Url& url) {
     // Ignore empty URLs seen during start-up for some tests.
     if (url.empty()) return;
     sys::Mutex::ScopedLock l(lock);
     if (link) {                 // URL changed after we initialized.
-        QPID_LOG(info, "HA: Backup failover URL set to " << url);
+        QPID_LOG(info, "HA: Backup broker URL set to " << url);
         link->setUrl(url);
     }
     else {
@@ -83,10 +91,4 @@ void Backup::setBrokerUrl(const Url& url
     }
 }
 
-Backup::~Backup() {
-    if (link) link->close();
-    if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
-    broker.getConnectionObservers().remove(excluder); // This allows client connections.
-}
-
 }} // namespace qpid::ha



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message