qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1333027 [4/13] - in /qpid/branches/qpid-3767/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/bindings/qpid/ruby/lib/qpid/ cpp/docs/a...
Date Wed, 02 May 2012 13:10:03 GMT
Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Backup.h Wed May  2 13:09:18 2012
@@ -38,6 +38,7 @@ namespace ha {
 class Settings;
 class ConnectionExcluder;
 class BrokerReplicator;
+class HaBroker;
 
 /**
  * State associated with a backup broker. Manages connections to primary.
@@ -47,7 +48,7 @@ class BrokerReplicator;
 class Backup
 {
   public:
-    Backup(broker::Broker&, const Settings&);
+    Backup(HaBroker&, const Settings&);
     ~Backup();
     void setBrokerUrl(const Url&);
 
@@ -55,6 +56,7 @@ class Backup
     void initialize(const Url&);
 
     sys::Mutex lock;
+    HaBroker& haBroker;
     broker::Broker& broker;
     Settings settings;
     boost::shared_ptr<broker::Link> link;

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed May  2 13:09:18 2012
@@ -19,6 +19,7 @@
  *
  */
 #include "BrokerReplicator.h"
+#include "HaBroker.h"
 #include "QueueReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
@@ -37,6 +38,7 @@
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventSubscribe.h"
 #include <algorithm>
+#include <sstream>
 
 namespace qpid {
 namespace ha {
@@ -87,6 +89,7 @@ const string QUEUE("queue");
 const string RHOST("rhost");
 const string TYPE("type");
 const string USER("user");
+const string HA_BROKER("habroker");
 
 const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string QMF2("qmf2");
@@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_nam
 const string _SCHEMA_ID("_schema_id");
 const string OBJECT("OBJECT");
 const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
 const string QMF_DEFAULT_DIRECT("qmf.default.direct");
 const string _QUERY_REQUEST("_query_request");
 const string BROKER("broker");
@@ -113,36 +117,13 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL };
-const string S_NONE="none";
-const string S_CONFIGURATION="configuration";
-const string S_ALL="all";
-
-ReplicateLevel replicateLevel(const string& level) {
-    if (level == S_NONE) return RL_NONE;
-    if (level == S_CONFIGURATION) return RL_CONFIGURATION;
-    if (level == S_ALL) return RL_ALL;
-    throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
-}
-
-ReplicateLevel replicateLevel(const framing::FieldTable& f) {
-    if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
-    else return RL_NONE;
-}
-
-ReplicateLevel replicateLevel(const Variant::Map& m) {
-    Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
-    if (i != m.end()) return replicateLevel(i->second.asString());
-    else return RL_NONE;
-}
-
-void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     Variant::Map request;
     request[_WHAT] = OBJECT;
     Variant::Map schema;
     schema[_CLASS_NAME] = className;
-    schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+    schema[_PACKAGE_NAME] = packageName;
     request[_SCHEMA_ID] = schema;
 
     AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
@@ -181,13 +162,34 @@ Variant::Map asMapVoid(const Variant& va
 
 } // namespace
 
+
+ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) {
+    ReplicateLevel rl;
+    if (qpid::ha::replicateLevel(str, rl)) return rl;
+    else return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) {
+    if (f.isSet(QPID_REPLICATE))
+        return replicateLevel(f.getAsString(QPID_REPLICATE));
+    else
+        return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) {
+    Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+    if (i != m.end())
+        return replicateLevel(i->second.asString());
+    else
+        return haBroker.getSettings().replicateDefault;
+}
+
 BrokerReplicator::~BrokerReplicator() {}
 
-BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
-    : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
+    : Exchange(QPID_CONFIGURATION_REPLICATOR),
+      haBroker(hb), broker(hb.getBroker()), link(l)
 {
-    QPID_LOG(info, "HA: Backup replicating from " <<
-             link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
     framing::Uuid uuid(true);
     const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
     broker.getLinks().declare(
@@ -214,21 +216,24 @@ void BrokerReplicator::initializeBridge(
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
 
     //declare and bind an event queue
-    peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+    FieldTable declareArgs;
+    declareArgs.setString(QPID_REPLICATE, str(RL_NONE));
+    peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
     //subscribe to the queue
     peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
     peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
     peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
 
-    //issue a query request for queues and another for exchanges using event queue as the reply-to address
-    sendQuery(QUEUE, queueName, sessionHandler);
-    sendQuery(EXCHANGE, queueName, sessionHandler);
-    sendQuery(BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+    // Issue a query request for queues, exchanges, bindings and the habroker
+    // using event queue as the reply-to address
+    sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler);
+    sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
+    sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
+    sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
+    QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName);
 }
 
-// FIXME aconway 2011-12-02: error handling in route.
 void BrokerReplicator::route(Deliverable& msg) {
     const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
     Variant::List list;
@@ -242,6 +247,7 @@ void BrokerReplicator::route(Deliverable
         if (headers->getAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
+                QPID_LOG(trace, "HA: Backup received event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
                 if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
@@ -253,24 +259,26 @@ void BrokerReplicator::route(Deliverable
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
-                string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
-                Variant::Map& values = i->asMap()[VALUES].asMap();
+                Variant::Map& map = i->asMap();
+                QPID_LOG(trace, "HA: Backup received event: " << map);
+                string type = map[SCHEMA_ID].asMap()[CLASS_NAME];
+                Variant::Map& values = map[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
-                else QPID_LOG(error, "HA: Backup received unknown response type=" << type
-                              << " values=" << values);
+                else if (type == HA_BROKER) doResponseHaBroker(values);
             }
-        } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
+        }
     } catch (const std::exception& e) {
-        QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
+        QPID_LOG(critical, "HA: Backup configuration failed: " << e.what()
+                 << ": while handling: " << list);
+        throw;
     }
 }
 
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup queue declare event " << values);
     string name = values[QNAME].asString();
     Variant::Map argsMap = asMapVoid(values[ARGS]);
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
@@ -287,27 +295,27 @@ void BrokerReplicator::doEventQueueDecla
                 values[USER].asString(),
                 values[RHOST].asString());
         if (result.second) {
-            // FIXME aconway 2011-11-22: should delete old queue and
-            // re-create from event.
-            // Events are always up to date, whereas responses may be
-            // out of date.
-            QPID_LOG(debug, "HA: Backup created queue: " << name);
+            QPID_LOG(debug, "HA: Backup queue declare event: " << name);
             startQueueReplicator(result.first);
         } else {
             // FIXME aconway 2011-12-02: what's the right way to handle this?
-            QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+            // Should we delete the old & re-create form the event? Responses
+            // may be old but events are always up-to-date.
+            QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name);
         }
     }
 }
 
 void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup queue delete event " << values);
     // The remote queue has already been deleted so replicator
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (queue && replicateLevel(queue->getSettings())) {
-        QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+    if (!queue) {
+        QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name);
+    } else if (!replicateLevel(queue->getSettings())) {
+        QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name);
+    } else {
         string rname = QueueReplicator::replicatorName(name);
         boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
         boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
@@ -316,11 +324,11 @@ void BrokerReplicator::doEventQueueDelet
         // actually be destroyed, deleting the exhange
         broker.getExchanges().destroy(rname);
         broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+        QPID_LOG(debug, "HA: Backup queue delete event: " << name);
     }
 }
 
 void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup exchange declare event " << values);
     Variant::Map argsMap(asMapVoid(values[ARGS]));
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
@@ -335,32 +343,32 @@ void BrokerReplicator::doEventExchangeDe
                 values[USER].asString(),
                 values[RHOST].asString()).second)
         {
-                    QPID_LOG(debug, "HA: Backup created exchange: " << name);
+            QPID_LOG(debug, "HA: Backup exchange declare event: " << name);
         } else {
-            // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+            // FIXME aconway 2011-11-22: should delete pre-existing exchange
             // and re-create from event. See comment in doEventQueueDeclare.
-            QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+            QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name);
         }
     }
 }
 
 void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup exchange delete event " << values);
     string name = values[EXNAME].asString();
-    try {
-        boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
-        if (exchange && replicateLevel(exchange->getArgs())) {
-            QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
-            broker.deleteExchange(
-                name,
-                values[USER].asString(),
-                values[RHOST].asString());
-        }
-    } catch (const framing::NotFoundException&) {}
+    boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+    if (!exchange) {
+        QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name);
+    } else if (!replicateLevel(exchange->getArgs())) {
+        QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name);
+    } else {
+        QPID_LOG(debug, "HA: Backup exchange delete event:" << name);
+        broker.deleteExchange(
+            name,
+            values[USER].asString(),
+            values[RHOST].asString());
+    }
 }
 
 void BrokerReplicator::doEventBind(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup bind event " << values);
     boost::shared_ptr<Exchange> exchange =
         broker.getExchanges().find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
@@ -373,15 +381,14 @@ void BrokerReplicator::doEventBind(Varia
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+        exchange->bind(queue, key, &args);
+        QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->bind(queue, key, &args);
     }
 }
 
 void BrokerReplicator::doEventUnbind(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup unbind event " << values);
     boost::shared_ptr<Exchange> exchange =
         broker.getExchanges().find(values[EXNAME].asString());
     boost::shared_ptr<Queue> queue =
@@ -394,16 +401,14 @@ void BrokerReplicator::doEventUnbind(Var
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+        exchange->unbind(queue, key, &args);
+        QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->unbind(queue, key, &args);
     }
 }
 
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup queue response " << values);
-    // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
@@ -420,17 +425,16 @@ void BrokerReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
-        QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
+        QPID_LOG(debug, "HA: Backup queue response: " << name);
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
-        QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+        QPID_LOG(warning, "HA: Backup queue response, already exists: " << name);
     }
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup exchange response " << values);
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
@@ -444,9 +448,10 @@ void BrokerReplicator::doResponseExchang
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/).second)
     {
-        QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
+        QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString());
     } else {
-        QPID_LOG(warning, "HA: Backup catch-up exchange already exists:  " << values[QNAME]);
+        QPID_LOG(warning, "HA: Backup exchange query, already exists: " <<
+                 values[QNAME].asString());
     }
 }
 
@@ -472,12 +477,10 @@ const std::string QUEUE_REF("queueRef");
 } // namespace
 
 void BrokerReplicator::doResponseBind(Variant::Map& values) {
-    QPID_LOG(debug, "HA: Backup bind response " << values);
     std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
     std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
     boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
     boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
-    // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
 
     // Automatically replicate binding if queue and exchange exist and are replicated
     if (exchange && replicateLevel(exchange->getArgs()) &&
@@ -487,16 +490,39 @@ void BrokerReplicator::doResponseBind(Va
         amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
         string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
-        QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+        QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
     }
 }
 
+namespace {
+const string REPLICATE_DEFAULT="replicateDefault";
+}
+
+// Received the ha-broker configuration object for the primary broker.
+void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
+    try {
+        ReplicateLevel mine = haBroker.getSettings().replicateDefault;
+        ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString());
+        if (mine != primary) {
+            std::ostringstream os;
+            os << "Replicate default on backup (" << mine
+               << ") does not match primary (" <<  primary << ")";
+            haBroker.shutdown(os.str());
+        }
+    } catch (const std::exception& e) {
+        std::ostringstream os;
+        os << "Received invalid replicate default from primary: " << e.what();
+        haBroker.shutdown(os.str());
+    }
+}
+
 void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
     if (replicateLevel(queue->getSettings()) == RL_ALL) {
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
-        broker.getExchanges().registerExchange(qr);
+        if (!broker.getExchanges().registerExchange(qr))
+            throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
         qr->activate();
     }
 }

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/BrokerReplicator.h Wed May  2 13:09:18 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include "ReplicateLevel.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/types/Variant.h"
 #include <boost/shared_ptr.hpp>
@@ -35,7 +36,12 @@ class Bridge;
 class SessionHandler;
 }
 
+namespace framing {
+class FieldTable;
+}
+
 namespace ha {
+class HaBroker;
 
 /**
  * Replicate configuration on a backup broker.
@@ -51,7 +57,7 @@ namespace ha {
 class BrokerReplicator : public broker::Exchange
 {
   public:
-    BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+    BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
     ~BrokerReplicator();
     std::string getType() const;
 
@@ -64,6 +70,10 @@ class BrokerReplicator : public broker::
   private:
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
 
+    ReplicateLevel replicateLevel(const std::string&);
+    ReplicateLevel replicateLevel(const framing::FieldTable& args);
+    ReplicateLevel replicateLevel(const types::Variant::Map& args);
+
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
     void doEventExchangeDeclare(types::Variant::Map& values);
@@ -74,9 +84,11 @@ class BrokerReplicator : public broker::
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);
     void doResponseBind(types::Variant::Map& values);
+    void doResponseHaBroker(types::Variant::Map& values);
 
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
+    HaBroker& haBroker;
     broker::Broker& broker;
     boost::shared_ptr<broker::Link> link;
 };

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed May  2 13:09:18 2012
@@ -27,6 +27,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/SignalHandler.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/ha/Package.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
@@ -69,16 +70,18 @@ HaBroker::HaBroker(broker::Broker& b, co
         throw Exception("Cannot start HA: management is disabled");
     _qmf::Package  packageInit(ma);
     mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
-    // FIXME aconway 2012-03-01: should start in catch-up state and move to backup
-    // only when caught up.
-    mgmtObject->set_status(BACKUP);
+    mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE);
+    mgmtObject->set_replicateDefault(str(settings.replicateDefault));
     ma->addObject(mgmtObject);
+
+    // NOTE: lock is not needed in a constructor but we created it just to pass
+    // to the set functions.
     sys::Mutex::ScopedLock l(lock);
     if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
     if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
 
     // If we are in a cluster, we start in backup mode.
-    if (settings.cluster) backup.reset(new Backup(b, s));
+    if (settings.cluster) backup.reset(new Backup(*this, s));
 }
 
 HaBroker::~HaBroker() {}
@@ -91,7 +94,7 @@ Manageable::status_t HaBroker::Managemen
               // NOTE: resetting backup allows client connections, so any
               // primary state should be set up here before backup.reset()
               backup.reset();
-              QPID_LOG(notice, "HA: Primary promoted from backup");
+              QPID_LOG(notice, "HA: Promoted to primary");
               mgmtObject->set_status(PRIMARY);
           }
           break;
@@ -126,8 +129,8 @@ Manageable::status_t HaBroker::Managemen
           link->setUrl(url);
           // Create a queue replicator
           boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
-          broker.getExchanges().registerExchange(qr);
           qr->activate();
+          broker.getExchanges().registerExchange(qr);
           break;
       }
 
@@ -145,7 +148,7 @@ void HaBroker::setClientUrl(const Url& u
 
 void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
     Url url = clientUrl.empty() ? brokerUrl : clientUrl;
-    assert(!url.empty());
+    if (url.empty()) throw Url::Invalid("HA client URL is empty");
     mgmtObject->set_publicBrokers(url.str());
     knownBrokers.clear();
     knownBrokers.push_back(url);
@@ -153,7 +156,7 @@ void HaBroker::updateClientUrl(const sys
 }
 
 void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
-    if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
+    if (url.empty()) throw Url::Invalid("HA broker URL is empty");
     QPID_LOG(debug, "HA: Setting broker URL to: " << url);
     brokerUrl = url;
     mgmtObject->set_brokers(brokerUrl.str());
@@ -171,4 +174,9 @@ std::vector<Url> HaBroker::getKnownBroke
     return knownBrokers;
 }
 
+void HaBroker::shutdown(const std::string& message) {
+    QPID_LOG(critical, "Shutting down: " << message);
+    broker.shutdown();
+}
+
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaBroker.h Wed May  2 13:09:18 2012
@@ -52,6 +52,12 @@ class HaBroker : public management::Mana
     management::Manageable::status_t ManagementMethod (
         uint32_t methodId, management::Args& args, std::string& text);
 
+    broker::Broker& getBroker() { return broker; }
+    const Settings& getSettings() const { return settings; }
+
+    // Log a critical error message and shut down the broker.
+    void shutdown(const std::string& message);
+
   private:
     void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
     void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/HaPlugin.cpp Wed May  2 13:09:18 2012
@@ -37,6 +37,9 @@ struct Options : public qpid::Options {
              "URL that backup brokers use to connect and fail over.")
             ("ha-public-brokers", optValue(settings.clientUrl,"URL"),
              "URL that clients use to connect and fail over, defaults to ha-brokers.")
+            ("ha-replicate",
+             optValue(settings.replicateDefault, "LEVEL"),
+            "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
             ("ha-expected-backups", optValue(settings.expectedBackups, "N"),
              "Number of backups expected to be active in the HA cluster.")
             ("ha-username", optValue(settings.username, "USER"),

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed May  2 13:09:18 2012
@@ -30,6 +30,7 @@
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
+#include "qpid/Msg.h"
 #include <boost/shared_ptr.hpp>
 
 namespace {
@@ -55,8 +56,8 @@ QueueReplicator::QueueReplicator(boost::
 {
     framing::Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
-    logPrefix = "HA: Backup " + queue->getName() + ": ";
-    QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+    logPrefix = "HA: Backup of " + queue->getName() + ": ";
+    QPID_LOG(info, logPrefix << "Created");
 }
 
 // This must be separate from the constructor so we can call shared_from_this.
@@ -78,7 +79,7 @@ void QueueReplicator::activate() {
         0,                  // sync?
         // Include shared_ptr to self to ensure we are not deleted
         // before initializeBridge is called.
-        boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+        boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
     );
     bridge = result.first;
 }
@@ -96,9 +97,7 @@ void QueueReplicator::deactivate() {
 }
 
 // Called in a broker connection thread when the bridge is created.
-// shared_ptr to self ensures we are not deleted before initializeBridge is called.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
-                                       boost::shared_ptr<QueueReplicator> /*self*/) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
     sys::Mutex::ScopedLock l(lock);
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -147,23 +146,33 @@ void QueueReplicator::dequeue(SequenceNu
 // Called in connection thread of the queues bridge to primary.
 void QueueReplicator::route(Deliverable& msg)
 {
-    const std::string& key = msg.getMessage().getRoutingKey();
-    sys::Mutex::ScopedLock l(lock);
-    if (key == DEQUEUE_EVENT_KEY) {
-        SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
-        QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
-        //TODO: should be able to optimise the following
-        for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
-            dequeue(*i, l);
-    } else if (key == POSITION_EVENT_KEY) {
-        SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
-        QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
-                 << " to " << position);
-        assert(queue->getPosition() <= position);
-        queue->setPosition(position);
-    } else {
-        msg.deliverTo(queue);
-        QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+    try {
+        const std::string& key = msg.getMessage().getRoutingKey();
+        sys::Mutex::ScopedLock l(lock);
+        if (key == DEQUEUE_EVENT_KEY) {
+            SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+            QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+            //TODO: should be able to optimise the following
+            for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+                dequeue(*i, l);
+        } else if (key == POSITION_EVENT_KEY) {
+            SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+            QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+                     << " to " << position);
+            if (queue->getPosition() > position) {
+                throw Exception(
+                    QPID_MSG(logPrefix << "Invalid position update from "
+                             << queue->getPosition() << " to " << position));
+            }
+            queue->setPosition(position);
+        } else {
+            msg.deliverTo(queue);
+            QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+        }
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+        throw;
     }
 }
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/QueueReplicator.h Wed May  2 13:09:18 2012
@@ -70,8 +70,7 @@ class QueueReplicator : public broker::E
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
 
   private:
-    void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler,
-                          boost::shared_ptr<QueueReplicator> self);
+    void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
     void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
 
     std::string logPrefix;

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Wed May  2 13:09:18 2012
@@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubs
     events(new Queue(mask(name))),
     consumer(new DelegatingConsumer(*this))
 {
+    // Separate the remote part from a "local-remote" address.
+    string address = parent->getSession().getConnection().getUrl();
+    size_t i = address.find('-');
+    if (i != string::npos) address = address.substr(i+1);
+    logPrefix = "HA: Primary ";
     stringstream ss;
-    ss << "HA: Primary: " << getQueue()->getName() << " at "
-       << parent->getSession().getConnection().getUrl() << ": ";
-    logPrefix = ss.str();
+    logSuffix = " (" + address + ")";
 
     // FIXME aconway 2011-12-09: Failover optimization removed.
     // There was code here to re-use messages already on the backup
@@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubs
     // can be re-introduced later. Last revision with the optimization:
     // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
 
-    QPID_LOG(debug, logPrefix << "Created backup subscription " << getName());
+    QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
 
     // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
     // so we will start consuming from the lowest numbered message.
@@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubs
 
 // Message is delivered in the subscription's connection thread.
 bool ReplicatingSubscription::deliver(QueuedMessage& m) {
-    // Add position events for the subscribed queue, not for the internal event queue.
-    if (m.queue && m.queue == getQueue().get()) {
-        sys::Mutex::ScopedLock l(lock);
-        assert(position == m.position);
-        // m.position is the position of the newly enqueued m on the local queue.
-        // backupPosition is latest position on the backup queue (before enqueueing m.)
-        assert(m.position > backupPosition);
-        if (m.position - backupPosition > 1) {
-            // Position has advanced because of messages dequeued ahead of us.
-            SequenceNumber send(m.position);
-            --send;   // Send the position before m was enqueued.
-            sendPositionEvent(send, l);
+    try {
+        // Add position events for the subscribed queue, not for the internal event queue.
+        if (m.queue && m.queue == getQueue().get()) {
+            sys::Mutex::ScopedLock l(lock);
+            if (position != m.position)
+                throw Exception(
+                    QPID_MSG("Expected position " << position
+                             << " but got " << m.position));
+            // m.position is the position of the newly enqueued m on the local queue.
+            // backupPosition is latest position on the backup queue (before enqueueing m.)
+            if (m.position <= backupPosition)
+                throw Exception(
+                    QPID_MSG("Expected position >  " << backupPosition
+                             << " but got " << m.position));
+
+            if (m.position - backupPosition > 1) {
+                // Position has advanced because of messages dequeued ahead of us.
+                SequenceNumber send(m.position);
+                --send;   // Send the position before m was enqueued.
+                sendPositionEvent(send, l);
+            }
+            backupPosition = m.position;
+            QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
         }
-        backupPosition = m.position;
-        QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
+        return ConsumerImpl::deliver(m);
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
+                 << logSuffix << ": " << e.what());
+        throw;
     }
-    return ConsumerImpl::deliver(m);
 }
 
 ReplicatingSubscription::~ReplicatingSubscription() {}
@@ -139,7 +155,7 @@ void ReplicatingSubscription::complete(
 {
     // Handle completions for the subscribed queue, not the internal event queue.
     if (qm.queue && qm.queue == getQueue().get()) {
-        QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+        QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
         Delayed::iterator i= delayed.find(qm.position);
         // The same message can be completed twice, by acknowledged and
         // dequeued, remove it from the set so it only gets completed
@@ -157,7 +173,7 @@ void ReplicatingSubscription::complete(
 void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
     sys::Mutex::ScopedLock l(lock);
     // Delay completion
-    QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+    QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
     qm.payload->getIngressCompletion().startCompleter();
     assert(delayed.find(qm.position) == delayed.end());
     delayed[qm.position] = qm;
@@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(c
 void ReplicatingSubscription::cancelComplete(
     const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
 {
-    QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+    QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
     v.second.payload->getIngressCompletion().finishCompleter();
 }
 
@@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel()
         boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
     {
         sys::Mutex::ScopedLock l(lock);
-        QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+        QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
         for_each(delayed.begin(), delayed.end(),
                  boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
         delayed.clear();
@@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDelete
 // Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
 {
-    QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+    QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
+             << " from " << getQueue()->getName() << logSuffix);
     string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     dequeues.encode(buffer);
@@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(c
 {
     {
         sys::Mutex::ScopedLock l(lock);
-        QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+        QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
         dequeues.add(qm.position);
         // If we have not yet sent this message to the backup, then
         // complete it now as it will never be accepted.
@@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(c
 void ReplicatingSubscription::sendPositionEvent(
     SequenceNumber position, const sys::Mutex::ScopedLock&l )
 {
-    QPID_LOG(trace, logPrefix << "Sending position " << position
-             << ", was " << backupPosition);
+    QPID_LOG(trace, logPrefix << "sending position " << position
+             << ", was " << backupPosition << logSuffix);
     string buf(backupPosition.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     position.encode(buffer);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Wed May  2 13:09:18 2012
@@ -94,7 +94,7 @@ class ReplicatingSubscription : public b
     bool doDispatch();
   private:
     typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
-    std::string logPrefix;
+    std::string logPrefix, logSuffix;
     boost::shared_ptr<broker::Queue> events;
     boost::shared_ptr<broker::Consumer> consumer;
     Delayed delayed;

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Settings.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/Settings.h Wed May  2 13:09:18 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include "ReplicateLevel.h"
 #include <string>
 
 namespace qpid {
@@ -33,11 +34,12 @@ namespace ha {
 class Settings
 {
   public:
-    Settings() : cluster(false), expectedBackups(0) {}
+    Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
     bool cluster;               // True if we are a cluster member.
     std::string clientUrl;
     std::string brokerUrl;
     size_t expectedBackups;
+    ReplicateLevel replicateDefault;
     std::string username, password, mechanism;
   private:
 };

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/management-schema.xml (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/ha/management-schema.xml Wed May  2 13:09:18 2012
@@ -32,7 +32,11 @@
 	      desc="Multiple-address URL used by clients to connect to the HA brokers."/>
 
     <property name="expectedBackups" type="uint16"
-	      desc="Number of HA backup brokers expected."/>>
+	      desc="Number of HA backup brokers expected."/>
+
+    <property
+	name="replicateDefault" type="sstr"
+	desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
 
     <method name="promote" desc="Promote a backup broker to primary."/>
 
@@ -48,7 +52,7 @@
       <arg name="expectedBackups" type="uint16" dir="I"/>
     </method>
 
-    <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+    <method name="replicate" desc="Replicate individual queue from remote broker.">
       <arg name="broker" type="sstr" dir="I"/>
       <arg name="queue" type="sstr" dir="I"/>
     </method>

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed May  2 13:09:18 2012
@@ -30,6 +30,7 @@
 #include "qpid/log/Statement.h"
 #include <qpid/broker/Message.h>
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/broker/ConnectionState.h"

Propchange: qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.h Wed May  2 13:09:18 2012
@@ -36,7 +36,6 @@
 #include "qpid/sys/MemStat.h"
 #include "qpid/types/Variant.h"
 #include <qpid/framing/AMQFrame.h>
-#include <qpid/framing/FieldValue.h>
 #include <qpid/framing/ResizableBuffer.h>
 #include <memory>
 #include <string>

Propchange: qpid/branches/qpid-3767/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Wed May  2 13:09:18 2012
@@ -131,7 +131,7 @@ boost::intrusive_ptr<Message> Replicatin
     //cloned body:
     AMQFrame header(*original->getFrames().getHeaders());
     header.setBof(false);
-    header.setEof(!original->getFrames().getContentSize());//if there is any content then the header is not the end of the frameset
+    header.setEof(!original->getFrames().hasContent());//if there are any content frames then the header is not the end of the frameset
     header.setBos(true);
     header.setEos(true);
     handler.handle(header);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Wed May  2 13:09:18 2012
@@ -60,7 +60,13 @@ void ReplicationExchange::route(Delivera
     if (args) {
         int eventType = args->getAsInt(REPLICATION_EVENT_TYPE);
         if (eventType) {
-            if (isDuplicate(args)) return;
+            if (isDuplicate(args)) {
+                if (mgmtExchange != 0) {
+                    mgmtExchange->inc_msgDrops();
+                    mgmtExchange->inc_byteDrops(msg.contentSize());
+                }
+	        return;
+	    }
             switch (eventType) {
               case ENQUEUE:
                 handleEnqueueEvent(args, msg);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Wed May  2 13:09:18 2012
@@ -291,12 +291,12 @@ private:
     volatile LONG opsInProgress;
     // Is there a write in progress?
     volatile bool writeInProgress;
+    // Or a read?
+    volatile bool readInProgress;
     // Deletion requested, but there are callbacks in progress.
     volatile bool queuedDelete;
     // Socket close requested, but there are operations in progress.
     volatile bool queuedClose;
-    // Most recent asynch read request
-    volatile AsynchReadResult* pendingRead;
 
 private:
     // Dispatch events that have completed.
@@ -346,6 +346,11 @@ private:
      * Called when there's a completion to process.
      */
     void completion(AsynchIoResult *result);
+
+    /**
+     * Helper function to facilitate the close operation
+     */
+    void cancelRead();
 };
 
 // This is used to encapsulate pure callbacks into a handle
@@ -374,9 +379,9 @@ AsynchIO::AsynchIO(const Socket& s,
     socket(s),
     opsInProgress(0),
     writeInProgress(false),
+    readInProgress(false),
     queuedDelete(false),
     queuedClose(false),
-    pendingRead(0),
     working(false) {
 }
 
@@ -392,21 +397,24 @@ AsynchIO::~AsynchIO() {
 }
 
 void AsynchIO::queueForDeletion() {
-    queuedDelete = true;
-    if (opsInProgress > 0) {
-        QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
-        // AsynchIOHandler calls this then deletes itself; don't do any more
-        // callbacks.
-        readCallback = 0;
-        eofCallback = 0;
-        disCallback = 0;
-        closedCallback = 0;
-        emptyCallback = 0;
-        idleCallback = 0;
-    }
-    else {
-        delete this;
+    {
+        ScopedLock<Mutex> l(completionLock);
+        assert(!queuedDelete);
+        queuedDelete = true;
+        if (working || opsInProgress > 0) {
+            QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+            // AsynchIOHandler calls this then deletes itself; don't do any more
+            // callbacks.
+            readCallback = 0;
+            eofCallback = 0;
+            disCallback = 0;
+            closedCallback = 0;
+            emptyCallback = 0;
+            idleCallback = 0;
+            return;
+        }
     }
+    delete this;
 }
 
 void AsynchIO::start(Poller::shared_ptr poller0) {
@@ -454,9 +462,14 @@ void AsynchIO::notifyPendingWrite() {
 }
 
 void AsynchIO::queueWriteClose() {
-    queuedClose = true;
-    if (!writeInProgress)
-        notifyPendingWrite();
+    {
+        ScopedLock<Mutex> l(completionLock);
+        queuedClose = true;
+        if (working || writeInProgress)
+            // no need to summon an IO thread
+            return;
+    }
+    notifyPendingWrite();
 }
 
 bool AsynchIO::writeQueueEmpty() {
@@ -469,7 +482,7 @@ bool AsynchIO::writeQueueEmpty() {
  * called when the read is complete and data is available.
  */
 void AsynchIO::startReading() {
-    if (queuedDelete)
+    if (queuedDelete || queuedClose)
         return;
 
     // (Try to) get a buffer; look on the front since there may be an
@@ -492,6 +505,7 @@ void AsynchIO::startReading() {
                                  readCount);
         DWORD bytesReceived = 0, flags = 0;
         InterlockedIncrement(&opsInProgress);
+        readInProgress = true;
         int status = WSARecv(toSocketHandle(socket),
                              const_cast<LPWSABUF>(result->getWSABUF()), 1,
                              &bytesReceived,
@@ -507,7 +521,6 @@ void AsynchIO::startReading() {
             }
         }
         // On status 0 or WSA_IO_PENDING, completion will handle the rest.
-        pendingRead = result;
     }
     else {
         notifyBuffersEmpty();
@@ -620,6 +633,7 @@ void AsynchIO::close(void) {
 void AsynchIO::readComplete(AsynchReadResult *result) {
     int status = result->getStatus();
     size_t bytes = result->getTransferred();
+    readInProgress = false;
     if (status == 0 && bytes > 0) {
         if (readCallback)
             readCallback(*this, result->getBuff());
@@ -629,8 +643,8 @@ void AsynchIO::readComplete(AsynchReadRe
         // No data read, so put the buffer back. It may be partially filled,
         // so "unread" it back to the front of the queue.
         unread(result->getBuff());
-        if (queuedClose && status == ERROR_OPERATION_ABORTED) {
-            return; // Expected reap from CancelIoEx
+        if (queuedClose) {
+            return; // Expected from cancelRead()
         }
         notifyEof();
         if (status != 0)
@@ -687,6 +701,8 @@ void AsynchIO::writeComplete(AsynchWrite
 }
 
 void AsynchIO::completion(AsynchIoResult *result) {
+    bool closing = false;
+    bool deleting = false;
     {
         ScopedLock<Mutex> l(completionLock);
         if (working) {
@@ -702,11 +718,8 @@ void AsynchIO::completion(AsynchIoResult
             {
                 ScopedUnlock<Mutex> ul(completionLock);
                 AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
-                if (r != 0) {
+                if (r != 0)
                     readComplete(r);
-                    // Set pendingRead to 0 if it's still pointing to (newly completed) r
-                    InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r);
-                }
                 else {
                     AsynchWriteResult *w =
                         dynamic_cast<AsynchWriteResult*>(result);
@@ -721,6 +734,8 @@ void AsynchIO::completion(AsynchIoResult
                 delete result;
                 result = 0;
                 InterlockedDecrement(&opsInProgress);
+                if (queuedClose && opsInProgress == 1 && readInProgress)
+                    cancelRead();
             }
             // Lock is held again.
             if (completionQueue.empty())
@@ -729,26 +744,40 @@ void AsynchIO::completion(AsynchIoResult
             completionQueue.pop();
         }
         working = false;
+        if (opsInProgress == 0) {
+            closing = queuedClose;
+            deleting = queuedDelete;
+        }
     }
     // Lock released; ok to close if ops are done and close requested.
     // Layer above will call back to queueForDeletion() if it hasn't
     // already been done. If it already has, go ahead and delete.
-    if (opsInProgress == 0) {
-        if (queuedClose)
-            // close() may cause a delete; don't trust 'this' on return
-            close();
-        else if (queuedDelete)
-            delete this;
-    }
+    if (deleting)
+        delete this;
+    else if (closing)
+        // close() may cause a delete; don't trust 'this' on return
+        close();
+}
+
+/*
+ * NOTE - this method must be called in the same context as other completions,
+ * so that the resulting readComplete, and final AsynchIO::close() is serialized
+ * after this method returns.
+ */
+void AsynchIO::cancelRead() {
+    if (queuedDelete)
+        return;                 // socket already deleted
     else {
-        if (queuedClose && pendingRead) {
-            // Force outstanding read to completion.  Layer above will
-            // call back.
-            CancelIoEx((HANDLE)toSocketHandle(socket),
-                       ((AsynchReadResult *)pendingRead)->overlapped());
-            pendingRead = 0;
-        }
-    }
+        ScopedLock<Mutex> l(completionLock);;
+        if (!completionQueue.empty())
+            return;             // process it; come back later if necessary
+    }
+    // Cancel outstanding read and force to completion.  Otherwise, on a faulty
+    // physical link, the pending read can remain uncompleted indefinitely.
+    // Draining the pending read will result in the official close (and
+    // notifyClosed).  CancelIoEX() is the natural choice, but not available in
+    // XP, so we make do with closesocket().
+    socket.close();
 }
 
 } // namespace windows

Propchange: qpid/branches/qpid-3767/qpid/cpp/src/tests/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/tests:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/Array.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/Array.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/Array.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/Array.cpp Wed May  2 13:09:18 2012
@@ -58,7 +58,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
     BOOST_CHECK_EQUAL(a, b);
 
     std::vector<std::string> data2;
-    b.collect(data2);
+    std::transform(b.begin(), b.end(), std::back_inserter(data2), Array::get<std::string, Array::ValuePtr>);
     //BOOST_CHECK_EQUAL(data, data2);
     BOOST_CHECK(data == data2);
 }
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testArrayAssignment)
         BOOST_CHECK_EQUAL(a, b);
     }
     std::vector<std::string> data2;
-    b.collect(data2);
+    std::transform(b.begin(), b.end(), std::back_inserter(data2), Array::get<std::string, Array::ValuePtr>);
     //BOOST_CHECK_EQUAL(data, data2);
     BOOST_CHECK(data == data2);
 }

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/CMakeLists.txt?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/CMakeLists.txt Wed May  2 13:09:18 2012
@@ -23,7 +23,11 @@ include (CTest)
 # Make sure that everything get built before the tests
 # Need to create a var with all the necessary top level targets
 
-add_definitions(-DBOOST_TEST_DYN_LINK)
+# If we're linking Boost for DLLs, turn that on for the unit test too.
+if (QPID_LINK_BOOST_DYNAMIC)
+    add_definitions(-DBOOST_TEST_DYN_LINK)
+endif (QPID_LINK_BOOST_DYNAMIC)
+
 include_directories( ${CMAKE_CURRENT_SOURCE_DIR} )
 
 include (FindPythonInterp)

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/FieldTable.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/FieldTable.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/FieldTable.cpp Wed May  2 13:09:18 2012
@@ -19,11 +19,11 @@
  *
  */
 #include <iostream>
+#include <algorithm>
 #include "qpid/framing/Array.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/List.h"
-#include "qpid/sys/alloca.h"
 
 #include "unit_test.h"
 
@@ -127,7 +127,7 @@ QPID_AUTO_TEST_CASE(testNestedValues)
         BOOST_CHECK(string("B") == b.getAsString("id"));
         a.getArray("C", c);
         std::vector<std::string> items;
-        c.collect(items);
+        std::transform(c.begin(), c.end(), std::back_inserter(items), Array::get<std::string, Array::ValuePtr>);
         BOOST_CHECK((uint) 2 == items.size());
         BOOST_CHECK(string("one") == items[0]);
         BOOST_CHECK(string("two") == items[1]);

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/FramingTest.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/FramingTest.cpp Wed May  2 13:09:18 2012
@@ -25,6 +25,7 @@
 #include "qpid/framing/all_method_bodies.h"
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
 #include "unit_test.h"
 
 #include <boost/bind.hpp>

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/Makefile.am?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/Makefile.am Wed May  2 13:09:18 2012
@@ -149,7 +149,7 @@ endif
 
 # Test programs that are installed and therefore built as part of make, not make check
 
-qpidexectest_SCRIPTS += qpid-cpp-benchmark install_env.sh
+qpidexectest_SCRIPTS += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh
 EXTRA_DIST += qpid-cpp-benchmark install_env.sh
 
 qpidexectest_PROGRAMS += receiver
@@ -320,7 +320,7 @@ EXTRA_DIST +=								\
   header_test.py							\
   ssl_test								\
   config.null								\
-  ais_check								\
+  cpg_check.sh.in							\
   run_federation_tests							\
   run_federation_sys_tests                  \
   run_long_federation_sys_tests             \
@@ -368,9 +368,15 @@ LONG_TESTS+=start_broker \
  fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
  run_msg_group_tests_soak \
  stop_broker \
- run_long_federation_sys_tests \
- run_failover_soak reliable_replication_test \
- federated_cluster_test_with_node_failure
+ run_long_federation_sys_tests
+
+if HAVE_LIBCPG
+
+LONG_TESTS+=	federated_cluster_test_with_node_failure	\
+		run_failover_soak				\
+		reliable_replication_test
+
+endif HAVE_LIBCPG
 
 EXTRA_DIST+=						\
 	fanout_perftest					\
@@ -381,7 +387,8 @@ EXTRA_DIST+=						\
 	reliable_replication_test			\
 	federated_cluster_test_with_node_failure        \
 	sasl_test_setup.sh                              \
-	run_msg_group_tests_soak
+	run_msg_group_tests_soak			\
+	qpidd-empty.conf
 
 check-long:
 	$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed May  2 13:09:18 2012
@@ -1139,6 +1139,13 @@ QPID_AUTO_TEST_CASE(testHeadersExchange)
     }
 }
 
+QPID_AUTO_TEST_CASE(testLargeRoutingKey)
+{
+    MessagingFixture fix;
+    std::string address = "amq.direct/" + std::string(300, 'x');//routing/binding key can be at most 225 chars in 0-10
+    BOOST_CHECK_THROW(fix.session.createReceiver(address), qpid::messaging::MessagingException);
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Wed May  2 13:09:18 2012
@@ -27,6 +27,7 @@
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/FieldValue.h"
 #include "MessageUtils.h"
 #include "BrokerFixture.h"
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/Uuid.cpp?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/Uuid.cpp (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/Uuid.cpp Wed May  2 13:09:18 2012
@@ -48,7 +48,7 @@ QPID_AUTO_TEST_CASE(testUuidCtor) {
     for_each(uuids.begin(), uuids.end(), unique);
 }
 
-boost::array<uint8_t, 16>  sample =  {{'\x1b', '\x4e', '\x28', '\xba', '\x2f', '\xa1', '\x11', '\xd2', '\x88', '\x3f', '\xb9', '\xa7', '\x61', '\xbd', '\xe3', '\xfb'}};
+boost::array<uint8_t, 16>  sample =  {{0x1b, 0x4e, 0x28, 0xba, 0x2f, 0xa1, 0x11, 0xd2, 0x88, 0x3f, 0xb9, 0xa7, 0x61, 0xbd, 0xe3, 0xfb}};
 const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb");
 const string zeroStr("00000000-0000-0000-0000-000000000000");
 

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/acl.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/acl.py?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/acl.py (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/acl.py Wed May  2 13:09:18 2012
@@ -47,6 +47,19 @@ class ACLTests(TestBase010):
         connection.start()
         return connection.session(str(uuid4()))
 
+    def port_i(self):
+        return int(self.defines["port-i"])
+
+    def port_u(self):
+        return int(self.defines["port-u"])
+
+    def get_session_by_port(self, user, passwd, byPort):
+        socket = connect(self.broker.host, byPort)
+        connection = Connection (sock=socket, username=user, password=passwd,
+                                 mechanism="PLAIN")
+        connection.start()
+        return connection.session(str(uuid4()))
+
     def reload_acl(self):
         result = None
         try:
@@ -55,6 +68,24 @@ class ACLTests(TestBase010):
             result = str(e)
         return result
 
+    def acl_lookup(self, userName, action, aclObj, aclObjName, propMap):
+        result = {}
+        try:
+            result = self.broker_access.acl_lookup(userName, action, aclObj, aclObjName, propMap)
+        except Exception, e:
+            result['text'] = str(e)
+            result['result'] = str(e)
+        return result
+
+    def acl_lookupPublish(self, userName, exchange, key):
+        result = {}
+        try:
+            result = self.broker_access.acl_lookupPublish(userName, exchange, key)
+        except Exception, e:
+            result['text'] = str(e)
+            result['result'] = str(e)
+        return result
+
     def get_acl_file(self):
         return ACLFile(self.config.defines.get("policy-file", "data_dir/policy.acl"))
 
@@ -73,6 +104,37 @@ class ACLTests(TestBase010):
         self.reload_acl()
         TestBase010.tearDown(self)
 
+
+    def Lookup(self, userName, action, aclObj, aclObjName, propMap, expectedResult):
+        result = self.acl_lookup(userName, action, aclObj, aclObjName, propMap)
+        if (result['result'] != expectedResult):
+            suffix = ', [ERROR: Expected= ' + expectedResult
+            if (result['result'] is None):
+                suffix = suffix + ', Exception= ' + result['text'] + ']'
+            else:
+                suffix = suffix + ', Actual= ' + result['result'] + ']'
+            self.fail('Lookup: name=' + userName + ', action=' + action + ', aclObj=' + aclObj + ', aclObjName=' + aclObjName + ', propertyMap=' + str(propMap) + suffix)
+
+
+    def LookupPublish(self, userName, exchName, keyName, expectedResult):
+        result = self.acl_lookupPublish(userName, exchName, keyName)
+        if (result['result'] != expectedResult):
+            if (result['result'] is None):
+                suffix = suffix + ', Exception= ' + result['text'] + ']'
+            else:
+                suffix = suffix + ', Actual= ' + result['result'] + ']'
+            self.fail('LookupPublish: name=' + userName + ', exchange=' + exchName + ', key=' + keyName + suffix)
+
+    def AllBut(self, allList, removeList):
+        tmpList = allList[:]
+        for item in removeList:
+            try:
+                tmpList.remove(item)
+            except Exception, e:
+                self.fail("ERROR in AllBut() \nallList =  %s \nremoveList =  %s \nerror =  %s " \
+                    % (allList, removeList, e))
+        return tmpList
+
    #=====================================
    # ACL general tests
    #=====================================
@@ -460,7 +522,8 @@ class ACLTests(TestBase010):
         Test cases for queue acl in allow mode
         """
         aclf = self.get_acl_file()
-        aclf.write('acl deny bob@QPID create queue name=q1 durable=true passive=true\n')
+        aclf.write('acl deny bob@QPID access queue name=q1\n')
+        aclf.write('acl deny bob@QPID create queue name=q1 durable=true\n')
         aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
         aclf.write('acl deny bob@QPID access queue name=q3\n')
         aclf.write('acl deny bob@QPID purge queue name=q3\n')
@@ -476,8 +539,15 @@ class ACLTests(TestBase010):
         session = self.get_session('bob','bob')
 
         try:
+            session.queue_declare(queue="q1", durable=True)
+            self.fail("ACL should deny queue create request with name=q1 durable=true");
+        except qpid.session.SessionException, e:
+            self.assertEqual(403,e.args[0].error_code)
+            session = self.get_session('bob','bob')
+
+        try:
             session.queue_declare(queue="q1", durable=True, passive=True)
-            self.fail("ACL should deny queue create request with name=q1 durable=true passive=true");
+            self.fail("ACL should deny queue passive declare request with name=q1 durable=true");
         except qpid.session.SessionException, e:
             self.assertEqual(403,e.args[0].error_code)
             session = self.get_session('bob','bob')
@@ -563,7 +633,8 @@ class ACLTests(TestBase010):
         Test cases for queue acl in deny mode
         """
         aclf = self.get_acl_file()
-        aclf.write('acl allow bob@QPID create queue name=q1 durable=true passive=true\n')
+        aclf.write('acl allow bob@QPID access queue name=q1\n')
+        aclf.write('acl allow bob@QPID create queue name=q1 durable=true\n')
         aclf.write('acl allow bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
         aclf.write('acl allow bob@QPID access queue name=q3\n')
         aclf.write('acl allow bob@QPID purge queue name=q3\n')
@@ -583,10 +654,16 @@ class ACLTests(TestBase010):
         session = self.get_session('bob','bob')
 
         try:
+            session.queue_declare(queue="q1", durable=True)
+        except qpid.session.SessionException, e:
+            if (403 == e.args[0].error_code):
+                self.fail("ACL should allow queue create request with name=q1 durable=true");
+
+        try:
             session.queue_declare(queue="q1", durable=True, passive=True)
         except qpid.session.SessionException, e:
             if (403 == e.args[0].error_code):
-                self.fail("ACL should allow queue create request with name=q1 durable=true passive=true");
+                self.fail("ACL should allow queue passive declare request with name=q1 durable=true passive=true");
 
         try:
             session.queue_declare(queue="q1", durable=False, passive=False)
@@ -736,7 +813,8 @@ class ACLTests(TestBase010):
         Test cases for exchange acl in allow mode
         """
         aclf = self.get_acl_file()
-        aclf.write('acl deny bob@QPID create exchange name=testEx durable=true passive=true\n')
+        aclf.write('acl deny bob@QPID access exchange name=testEx\n')
+        aclf.write('acl deny bob@QPID create exchange name=testEx durable=true\n')
         aclf.write('acl deny bob@QPID create exchange name=ex1 type=direct\n')
         aclf.write('acl deny bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
         aclf.write('acl deny bob@QPID bind exchange name=myEx queuename=q1 routingkey=rk1\n')
@@ -755,18 +833,25 @@ class ACLTests(TestBase010):
         session.exchange_declare(exchange='myEx', type='direct')
 
         try:
+            session.exchange_declare(exchange='testEx', durable=True)
+            self.fail("ACL should deny exchange create request with name=testEx durable=true");
+        except qpid.session.SessionException, e:
+            self.assertEqual(403,e.args[0].error_code)
+            session = self.get_session('bob','bob')
+
+        try:
             session.exchange_declare(exchange='testEx', durable=True, passive=True)
-            self.fail("ACL should deny exchange create request with name=testEx durable=true passive=true");
+            self.fail("ACL should deny passive exchange declare request with name=testEx durable=true passive=true");
         except qpid.session.SessionException, e:
             self.assertEqual(403,e.args[0].error_code)
             session = self.get_session('bob','bob')
 
         try:
-            session.exchange_declare(exchange='testEx', type='direct', durable=True, passive=False)
+            session.exchange_declare(exchange='testEx', type='direct', durable=False)
         except qpid.session.SessionException, e:
             print e
             if (403 == e.args[0].error_code):
-                self.fail("ACL should allow exchange create request for testEx with any parameter other than durable=true and passive=true");
+                self.fail("ACL should allow exchange create request for testEx with any parameter other than durable=true");
 
         try:
             session.exchange_declare(exchange='ex1', type='direct')
@@ -867,7 +952,7 @@ class ACLTests(TestBase010):
         Test cases for exchange acl in deny mode
         """
         aclf = self.get_acl_file()
-        aclf.write('acl allow bob@QPID create exchange name=myEx durable=true passive=false\n')
+        aclf.write('acl allow bob@QPID create exchange name=myEx durable=true\n')
         aclf.write('acl allow bob@QPID bind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
         aclf.write('acl allow bob@QPID unbind exchange name=amq.topic queuename=bar routingkey=foo.*\n')
         aclf.write('acl allow bob@QPID access exchange name=myEx queuename=q1 routingkey=rk1.*\n')
@@ -1278,6 +1363,188 @@ class ACLTests(TestBase010):
         admin.set_timestamp_cfg(ts) #should pass
 
 
+
+   #=====================================
+   # QMF Functional tests
+   #=====================================
+
+    def test_qmf_functional_tests(self):
+        """
+        Test using QMF method hooks into ACL logic
+        """
+        aclf = self.get_acl_file()
+        aclf.write('group admins moe@COMPANY.COM \\\n')
+        aclf.write('             larry@COMPANY.COM \\\n')
+        aclf.write('             curly@COMPANY.COM \\\n')
+        aclf.write('             shemp@COMPANY.COM\n')
+        aclf.write('group auditors aaudit@COMPANY.COM baudit@COMPANY.COM caudit@COMPANY.COM \\\n')
+        aclf.write('               daudit@COMPANY.COM eaduit@COMPANY.COM eaudit@COMPANY.COM\n')
+        aclf.write('group tatunghosts tatung01@COMPANY.COM \\\n')
+        aclf.write('      tatung02/x86.build.company.com@COMPANY.COM \\\n')
+        aclf.write('      tatung03/x86.build.company.com@COMPANY.COM \\\n')
+        aclf.write('      tatung04/x86.build.company.com@COMPANY.COM \n')
+        aclf.write('group publishusers publish@COMPANY.COM x-pubs@COMPANY.COM\n')
+        aclf.write('acl allow-log admins all all\n')
+        aclf.write('# begin hack alert: allow anonymous to access the lookup debug functions\n')
+        aclf.write('acl allow-log anonymous create  queue\n')
+        aclf.write('acl allow-log anonymous all     exchange name=qmf.*\n')
+        aclf.write('acl allow-log anonymous all     exchange name=amq.direct\n')
+        aclf.write('acl allow-log anonymous all     exchange name=qpid.management\n')
+        aclf.write('acl allow-log anonymous access  method   name=*\n')
+        aclf.write('# end hack alert\n')
+        aclf.write('acl allow-log auditors all exchange name=company.topic routingkey=private.audit.*\n')
+        aclf.write('acl allow-log tatunghosts  publish exchange name=company.topic  routingkey=tatung.*\n')
+        aclf.write('acl allow-log tatunghosts  publish exchange name=company.direct routingkey=tatung-service-queue\n')
+        aclf.write('acl allow-log publishusers create queue\n')
+        aclf.write('acl allow-log publishusers publish exchange name=qpid.management routingkey=broker\n')
+        aclf.write('acl allow-log publishusers publish exchange name=qmf.default.topic routingkey=*\n')
+        aclf.write('acl allow-log publishusers publish exchange name=qmf.default.direct routingkey=*\n')
+        aclf.write('acl allow-log all bind exchange name=company.topic  routingkey=tatung.*\n')
+        aclf.write('acl allow-log all bind exchange name=company.direct routingkey=tatung-service-queue\n')
+        aclf.write('acl allow-log all consume queue\n')
+        aclf.write('acl allow-log all access exchange\n')
+        aclf.write('acl allow-log all access queue\n')
+        aclf.write('acl allow-log all create queue name=tmp.* durable=false autodelete=true exclusive=true policytype=ring\n')
+        aclf.write('acl allow mrQ create queue queuemaxsizelowerlimit=100 queuemaxsizeupperlimit=200 queuemaxcountlowerlimit=300 queuemaxcountupperlimit=400\n')
+        aclf.write('acl deny-log all all\n')
+        aclf.close()
+
+        result = self.reload_acl()
+        if (result):
+            self.fail(result)
+
+        #
+        # define some group lists
+        #
+        g_admins = ['moe@COMPANY.COM', \
+                    'larry@COMPANY.COM', \
+                    'curly@COMPANY.COM', \
+                    'shemp@COMPANY.COM']
+
+        g_auditors = [ 'aaudit@COMPANY.COM','baudit@COMPANY.COM','caudit@COMPANY.COM', \
+                       'daudit@COMPANY.COM','eaduit@COMPANY.COM','eaudit@COMPANY.COM']
+
+        g_tatunghosts = ['tatung01@COMPANY.COM', \
+                         'tatung02/x86.build.company.com@COMPANY.COM', \
+                         'tatung03/x86.build.company.com@COMPANY.COM', \
+                         'tatung04/x86.build.company.com@COMPANY.COM']
+
+        g_publishusers = ['publish@COMPANY.COM', 'x-pubs@COMPANY.COM']
+
+        g_public = ['jpublic@COMPANY.COM', 'me@yahoo.com']
+
+        g_all = g_admins + g_auditors + g_tatunghosts + g_publishusers + g_public
+
+        action_all = ['consume','publish','create','access','bind','unbind','delete','purge','update']
+
+        #
+        # Run some tests verifying against users who are in and who are out of given groups.
+        #
+
+        for u in g_admins:
+            self.Lookup(u, "create", "queue", "anything", {"durable":"true"}, "allow-log")
+
+        uInTest = g_auditors + g_admins
+        uOutTest = self.AllBut(g_all, uInTest)
+
+        for u in uInTest:
+            self.LookupPublish(u, "company.topic", "private.audit.This", "allow-log")
+
+        for u in uInTest:
+            for a in action_all:
+                self.Lookup(u, a, "exchange", "company.topic", {"routingkey":"private.audit.This"}, "allow-log")
+
+        for u in uOutTest:
+            self.LookupPublish(u, "company.topic", "private.audit.This", "deny-log")
+            self.Lookup(u, "bind", "exchange", "company.topic", {"routingkey":"private.audit.This"}, "deny-log")
+
+        uInTest = g_admins + g_tatunghosts
+        uOutTest = self.AllBut(g_all, uInTest)
+
+        for u in uInTest:
+            self.LookupPublish(u, "company.topic",  "tatung.this2",         "allow-log")
+            self.LookupPublish(u, "company.direct", "tatung-service-queue", "allow-log")
+
+        for u in uOutTest:
+            self.LookupPublish(u, "company.topic",  "tatung.this2",         "deny-log")
+            self.LookupPublish(u, "company.direct", "tatung-service-queue", "deny-log")
+
+        for u in uOutTest:
+            for a in ["bind", "access"]:
+                self.Lookup(u, a, "exchange", "company.topic",  {"routingkey":"tatung.this2"},         "allow-log")
+                self.Lookup(u, a, "exchange", "company.direct", {"routingkey":"tatung-service-queue"}, "allow-log")
+
+        uInTest = g_admins + g_publishusers
+        uOutTest = self.AllBut(g_all, uInTest)
+
+        for u in uInTest:
+            self.LookupPublish(u, "qpid.management",    "broker",   "allow-log")
+            self.LookupPublish(u, "qmf.default.topic",  "this3",    "allow-log")
+            self.LookupPublish(u, "qmf.default.direct", "this4",    "allow-log")
+
+        for u in uOutTest:
+            self.LookupPublish(u, "qpid.management",    "broker",   "deny-log")
+            self.LookupPublish(u, "qmf.default.topic",  "this3",    "deny-log")
+            self.LookupPublish(u, "qmf.default.direct", "this4",    "deny-log")
+
+        for u in uOutTest:
+            for a in ["bind"]:
+                self.Lookup(u, a, "exchange", "qpid.management",    {"routingkey":"broker"}, "deny-log")
+                self.Lookup(u, a, "exchange", "qmf.default.topic",  {"routingkey":"this3"},  "deny-log")
+                self.Lookup(u, a, "exchange", "qmf.default.direct", {"routingkey":"this4"},  "deny-log")
+            for a in ["access"]:
+                self.Lookup(u, a, "exchange", "qpid.management",    {"routingkey":"broker"}, "allow-log")
+                self.Lookup(u, a, "exchange", "qmf.default.topic",  {"routingkey":"this3"},  "allow-log")
+                self.Lookup(u, a, "exchange", "qmf.default.direct", {"routingkey":"this4"},  "allow-log")
+
+        # Test against queue size limits
+
+        self.Lookup('mrQ', 'create', 'queue', 'abc', {"maxqueuesize":"150", "maxqueuecount":"350"}, "allow")
+        self.Lookup('mrQ', 'create', 'queue', 'def', {"maxqueuesize":"99",  "maxqueuecount":"350"}, "deny")
+        self.Lookup('mrQ', 'create', 'queue', 'uvw', {"maxqueuesize":"201", "maxqueuecount":"350"}, "deny")
+        self.Lookup('mrQ', 'create', 'queue', 'xyz', {"maxqueuesize":"150", "maxqueuecount":"299"}, "deny")
+        self.Lookup('mrQ', 'create', 'queue', '',    {"maxqueuesize":"150", "maxqueuecount":"401"}, "deny")
+        self.Lookup('mrQ', 'create', 'queue', '',    {"maxqueuesize":"0",   "maxqueuecount":"401"}, "deny")
+        self.Lookup('mrQ', 'create', 'queue', '',    {"maxqueuesize":"150", "maxqueuecount":"0"  }, "deny")
+
+
+   #=====================================
+   # Connection limits
+   #=====================================
+
+    def test_connection_limits(self):
+        """
+        Test ACL control connection limits
+        """
+        # By username should be able to connect twice per user
+        try:
+            sessiona1 = self.get_session_by_port('anonymous','anonymous', self.port_u())
+            sessiona2 = self.get_session_by_port('anonymous','anonymous', self.port_u())
+        except Exception, e:
+            self.fail("Could not create two connections per user: " + str(e))
+
+        # Third session should fail
+        try:
+            sessiona3 = self.get_session_by_port('anonymous','anonymous', self.port_u())
+            self.fail("Should not be able to create third connection")
+        except Exception, e:
+            result = None
+
+        # By IP address should be able to connect twice per client address
+        try:
+            sessionb1 = self.get_session_by_port('anonymous','anonymous', self.port_i())
+            sessionb2 = self.get_session_by_port('anonymous','anonymous', self.port_i())
+        except Exception, e:
+            self.fail("Could not create two connections per user: " + str(e))
+
+        # Third session should fail
+        try:
+            sessionb3 = self.get_session_by_port('anonymous','anonymous', self.port_i())
+            self.fail("Should not be able to create third connection")
+        except Exception, e:
+            result = None
+
+
 class BrokerAdmin:
     def __init__(self, broker, username=None, password=None):
         self.connection = qpid.messaging.Connection(broker)

Modified: qpid/branches/qpid-3767/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/cpp/src/tests/brokertest.py?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3767/qpid/cpp/src/tests/brokertest.py Wed May  2 13:09:18 2012
@@ -436,6 +436,35 @@ class Cluster:
     def __getitem__(self,index): return self._brokers[index]
     def __iter__(self): return self._brokers.__iter__()
 
+
+def browse(session, queue, timeout=0, transform=lambda m: m.content):
+    """Return a list with the contents of each message on queue."""
+    r = session.receiver("%s;{mode:browse}"%(queue))
+    r.capacity = 100
+    try:
+        contents = []
+        try:
+            while True: contents.append(transform(r.fetch(timeout=timeout)))
+        except messaging.Empty: pass
+    finally: r.close()
+    return contents
+
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
+    """Assert that the contents of messages on queue (as retrieved
+    using session and timeout) exactly match the strings in
+    expect_contents"""
+    actual_contents = browse(session, queue, timeout, transform=transform)
+    if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+    assert expect_contents == actual_contents, msg
+
+def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"):
+    """Wait up to timeout for contents of queue to match expect_contents"""
+    test = lambda: browse(session, queue, 0, transform=transform) == expect_contents
+    retry(test, timeout, delay)
+    actual_contents = browse(session, queue, 0, transform=transform)
+    if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+    assert expect_contents == actual_contents, msg
+
 class BrokerTest(TestCase):
     """
     Tracks processes started by test and kills at end of test.
@@ -501,30 +530,9 @@ class BrokerTest(TestCase):
         cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
         return cluster
 
-    def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
-        """Return a list with the contents of each message on queue."""
-        r = session.receiver("%s;{mode:browse}"%(queue))
-        r.capacity = 100
-        try:
-            contents = []
-            try:
-                while True: contents.append(transform(r.fetch(timeout=timeout)))
-            except messaging.Empty: pass
-        finally: r.close()
-        return contents
-
-    def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
-        """Assert that the contents of messages on queue (as retrieved
-        using session and timeout) exactly match the strings in
-        expect_contents"""
-        actual_contents = self.browse(session, queue, timeout, transform=transform)
-        self.assertEqual(expect_contents, actual_contents)
-
-    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
-        """Wait up to timeout for contents of queue to match expect_contents"""
-        test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
-        retry(test, timeout, delay)
-        self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+    def browse(self, *args, **kwargs): browse(*args, **kwargs)
+    def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
+    def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
 
 def join(thread, timeout=10):
     thread.join(timeout)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message