qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1056378 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/ qpid/management/ tests/
Date Fri, 07 Jan 2011 16:32:35 GMT
Author: aconway
Date: Fri Jan  7 16:32:34 2011
New Revision: 1056378

URL: http://svn.apache.org/viewvc?rev=1056378&view=rev
Log:
QPID-2982: Improved cluster/management logging and automated test for log consistency.

The cluster_tests.py test_management test is augmented to compare
broker logs after the test completes. Any inconsistency in the logs
causes the test to fail. This check is currently disabled as it is
failing due to known issues.

Added:
    qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan  7 16:32:34 2011
@@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& 
                       "Error delivering frames",
                       poller),
     failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
-    updateDataExchange(new UpdateDataExchange(this)),
+    updateDataExchange(new UpdateDataExchange(*this)),
     quorum(boost::bind(&Cluster::leave, this)),
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
@@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const b
 
 // Called in connection thread to insert an updated shadow connection.
 void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
-    QPID_LOG(info, *this << " new shadow connection " << c->getId());
+    QPID_LOG(debug, *this << " new shadow connection " << c->getId());
     // Safe to use connections here because we're pre-catchup, stalled
     // and discarding, so deliveredFrame is not processing any
     // connection events.
@@ -749,7 +749,7 @@ struct AppendQueue {
 std::string Cluster::debugSnapshot() {
     assertClusterSafe();
     std::ostringstream msg;
-    msg << "queue snapshot at " << map.getFrameSeq() << ":";
+    msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
     AppendQueue append(msg);
     broker.getQueues().eachQueue(append);
     return msg.str();
@@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId
         checkUpdateIn(l);
     }
     else {
-        QPID_LOG(debug,*this << " unstall, ignore update " << updater
+        QPID_LOG(info, *this << " unstall, ignore update " << updater
                  << " to " << updatee);
         deliverEventQueue.start(); // Not involved in update.
     }
@@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
         // NB: don't updateMgmtMembership() here as we are not in the deliver
         // thread. It will be updated on delivery of the "ready" we just mcast.
         broker.setClusterUpdatee(false);
+        discarding = false;     // OK to set, we're stalled for update.
+        QPID_LOG(notice, *this << " update complete, starting catch-up.");
+        QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
         if (mAgent) {
             // Update management agent now, after all update activity is complete.
             updateDataExchange->updateManagementAgent(mAgent);
             mAgent->suppress(false); // Enable management output.
             mAgent->clusterUpdate();
         }
-        discarding = false;     // OK to set, we're stalled for update.
-        QPID_LOG(notice, *this << " update complete, starting catch-up.");
-        QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
         enableClusterSafe();    // Enable cluster-safe assertions
         deliverEventQueue.start();
     }
@@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& u
         mgmtObject->set_clusterID(clusterId.str());
         mgmtObject->set_memberID(stream.str());
     }
-    QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
+    QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
 }
 
 void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Jan  7 16:32:34 2011
@@ -78,6 +78,10 @@ using namespace framing;
 namespace arg=client::arg;
 using client::SessionBase_0_10Access;
 
+std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
+    return o << "cluster(" << c.updaterId << " UPDATER)";
+}
+
 struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler 
 {
     boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -142,7 +146,7 @@ void UpdateClient::run() {
 }
 
 void UpdateClient::update() {
-    QPID_LOG(debug, updaterId << " updating state to " << updateeId
+    QPID_LOG(debug, *this << " updating state to " << updateeId
              << " at " << updateeUrl);
     Broker& b = updaterBroker;
 
@@ -177,14 +181,14 @@ void UpdateClient::update() {
     // NOTE: connection will be closed from the other end, don't close
     // it here as that causes a race.
     
-    // FIXME aconway 2010-03-15: This sleep avoids the race condition
+    // TODO aconway 2010-03-15: This sleep avoids the race condition
     // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
     // It allows the connection to fully close before destroying the
     // Connection object. Remove when the bug is fixed.
     //
     sys::usleep(10*1000);
 
-    QPID_LOG(debug,  updaterId << " update completed to " << updateeId
+    QPID_LOG(debug,  *this << " update completed to " << updateeId
              << " at " << updateeUrl << ": " << membership);
 }
 
@@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetup
     management::ManagementAgent* agent = updaterBroker.getManagementAgent();
     if (!agent) return;
 
-    QPID_LOG(debug, updaterId << " updating management setup-state.");
+    QPID_LOG(debug, *this << " updating management setup-state.");
     std::string vendor, product, instance;
     agent->getName(vendor, product, instance);
     ClusterConnectionProxy(session).managementSetupState(
@@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent
     if (!agent) return;
     string data;
 
-    QPID_LOG(debug, updaterId << " updating management schemas. ")
+    QPID_LOG(debug, *this << " updating management schemas. ")
     agent->exportSchemas(data);
     session.messageTransfer(
         arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
         arg::destination=UpdateDataExchange::EXCHANGE_NAME);
 
-    QPID_LOG(debug, updaterId << " updating management agents. ")
+    QPID_LOG(debug, *this << " updating management agents. ")
     agent->exportAgents(data);
     session.messageTransfer(
         arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
         arg::destination=UpdateDataExchange::EXCHANGE_NAME);
 
-    QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+    QPID_LOG(debug, *this << " updating management deleted objects. ")
     typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
     DeletedObjectList deleted;
     agent->exportDeletedObjects(deleted);
@@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent
 }
 
 void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
-    QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
+    QPID_LOG(debug, *this << " updating exchange " << ex->getName());
     ClusterConnectionProxy(session).exchange(encode(*ex));
 }
 
@@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::A
 }
 
 void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
-    QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+    QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
     updateQueue(shadowSession, q);
 }
 
 void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
     if (!q->hasExclusiveOwner()) {
-        QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+        QPID_LOG(debug, *this << " updating queue " << q->getName());
         updateQueue(session, q);
     }//else queue will be updated as part of session state of owning session
 }
@@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(cons
     SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
     uint16_t channel =  ci->getParent().getSession().getChannel();
     ClusterConnectionProxy(shadowConnection).outputTask(channel,  ci->getName());
-    QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+    QPID_LOG(debug, *this << " updating output task " << ci->getName()
              << " channel=" << channel);
 }
 
 void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
-    QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+    QPID_LOG(debug, *this << " updating connection " << *updateConnection);
     assert(updateConnection->getBrokerConnection());
     broker::Connection& bc = *updateConnection->getBrokerConnection();
     
@@ -398,14 +402,14 @@ void UpdateClient::updateConnection(cons
         updateConnection->getOutput().getSendMax()
     );
     shadowConnection.close();
-    QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
+    QPID_LOG(debug, *this << " updated connection " << *updateConnection);
 }
 
 void UpdateClient::updateSession(broker::SessionHandler& sh) {
     broker::SessionState* ss = sh.getSession();
     if (!ss) return;            // no session.
 
-    QPID_LOG(debug, updaterId << " updating session " << ss->getId());
+    QPID_LOG(debug, *this << " updating session " << ss->getId());
 
     // Create a client session to update session state. 
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
@@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker:
 
     // Re-create session state on remote connection.
 
-    QPID_LOG(debug, updaterId << " updating exclusive queues.");
+    QPID_LOG(debug, *this << " updating exclusive queues.");
     ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
 
-    QPID_LOG(debug, updaterId << " updating consumers.");
+    QPID_LOG(debug, *this << " updating consumers.");
     ss->getSemanticState().eachConsumer(
         boost::bind(&UpdateClient::updateConsumer, this, _1));
 
-    QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
+    QPID_LOG(debug, *this << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
     std::for_each(drs.begin(), drs.end(),
                   boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker:
     if (inProgress) {
         inProgress->getFrames().map(simpl->out);
     }
-    QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
+    QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());
 }
 
 void UpdateClient::updateConsumer(
     const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
 {
-    QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+    QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
              << shadowSession.getId());
 
     using namespace message;
@@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(
     );
     consumerNumbering.add(ci);
 
-    QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+    QPID_LOG(debug, *this << " updated consumer " << ci->getName()
              << " on " << shadowSession.getId());
 }
     
@@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpC
 };
     
 void UpdateClient::updateTxState(broker::SemanticState& s) {
-    QPID_LOG(debug, updaterId << " updating TX transaction state.");
+    QPID_LOG(debug, *this << " updating TX transaction state.");
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
     broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Fri Jan  7 16:32:34 2011
@@ -30,7 +30,7 @@
 #include "qpid/broker/SemanticState.h"
 #include "qpid/sys/Runnable.h"
 #include <boost/shared_ptr.hpp>
-
+#include <iosfwd>
 
 namespace qpid {
 
@@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnabl
     boost::function<void()> done;
     boost::function<void(const std::exception& e)> failed;
     client::ConnectionSettings connectionSettings;
+
+  friend std::ostream& operator<<(std::ostream&, const UpdateClient&);
 };
 
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_UPDATECLIENT_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp Fri Jan  7 16:32:34 2011
@@ -19,6 +19,7 @@
  *
  */
 #include "UpdateDataExchange.h"
+#include "Cluster.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/Message.h"
@@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MA
 const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
 const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
 
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
-    Exchange(EXCHANGE_NAME, parent)
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+    return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
+UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
+    Exchange(EXCHANGE_NAME, &cluster),
+    clusterId(cluster.getId())
 {}
 
 void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagemen
 
     framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
     agent->importAgents(buf1);
-    QPID_LOG(debug, " Updated management agents.");
+    QPID_LOG(debug, *this << " updated management agents.");
 
     framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
     agent->importSchemas(buf2);
-    QPID_LOG(debug, " Updated management schemas");
+    QPID_LOG(debug, *this << " updated management schemas.");
 
     using amqp_0_10::ListCodec;
     using types::Variant;
@@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagemen
                               new management::ManagementAgent::DeletedObject(*i)));
     }
     agent->importDeletedObjects(objects);
-    QPID_LOG(debug, " Updated management deleted objects.");
+    QPID_LOG(debug, *this << " updated management deleted objects.");
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h Fri Jan  7 16:32:34 2011
@@ -23,6 +23,8 @@
  */
 
 #include "qpid/broker/Exchange.h"
+#include "types.h"
+#include <iosfwd>
 
 namespace qpid {
 
@@ -31,6 +33,7 @@ class ManagementAgent;
 }
 
 namespace cluster {
+class Cluster;
 
 /**
  * An exchange used to send data that is to large for a control
@@ -45,7 +48,7 @@ class UpdateDataExchange : public broker
     static const std::string MANAGEMENT_SCHEMAS_KEY;
     static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
 
-    UpdateDataExchange(management::Manageable* parent);
+    UpdateDataExchange(Cluster& parent);
 
     void route(broker::Deliverable& msg, const std::string& routingKey,
                const framing::FieldTable* args);
@@ -71,10 +74,11 @@ class UpdateDataExchange : public broker
     void updateManagementAgent(management::ManagementAgent* agent);
 
   private:
-
+    MemberId clusterId;
     std::string managementAgents;
     std::string managementSchemas;
     std::string managementDeletedObjects;
+  friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Jan  7 16:32:34 2011
@@ -18,7 +18,12 @@
  * under the License.
  *
  */
- 
+
+
+// NOTE on use of log levels: The criteria for using trace vs. debug
+// is to use trace for log messages that are generated for each
+// unbatched stats/props notification and debug for everything else.
+
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/broker/DeliverableMessage.h"
@@ -89,7 +94,7 @@ static Variant::Map mapEncodeSchemaId(co
 
 ManagementAgent::RemoteAgent::~RemoteAgent ()
 {
-    QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+    QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
     if (mgmtObject != 0) {
         mgmtObject->resourceDestroy();
         agent.deleteObjectNowLH(mgmtObject->getObjectId());
@@ -169,7 +174,7 @@ void ManagementAgent::configure(const st
                 uuid.generate();
                 QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid);
             } else
-                QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
+                QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid);
 
             // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
             bootSequence++;
@@ -308,7 +313,7 @@ ObjectId ManagementAgent::addObject(Mana
         }
         newManagementObjects[objId] = object;
     }
-
+    QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
     return objId;
 }
 
@@ -330,7 +335,6 @@ ObjectId ManagementAgent::addObject(Mana
     }
 
     object->setObjectId(objId);
-
     {
         sys::Mutex::ScopedLock lock(addLock);
         ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
@@ -340,7 +344,7 @@ ObjectId ManagementAgent::addObject(Mana
         }
         newManagementObjects[objId] = object;
     }
-
+    QPID_LOG(debug, "Management object added: " << objId.getV2Key());
     return objId;
 }
 
@@ -370,7 +374,7 @@ void ManagementAgent::raiseEvent(const M
         outBuffer.reset();
         sendBufferLH(outBuffer, outLen, mExchange,
                    "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
-        QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+        QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
     }
 
     if (qmf2Support) {
@@ -408,9 +412,8 @@ void ManagementAgent::raiseEvent(const M
         list_.push_back(map_);
         ListCodec::encode(list_, content);
         sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
-        QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+        QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
     }
-
 }
 
 ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -467,7 +470,7 @@ void ManagementAgent::clientAdded (const
         outLen = outBuffer.getPosition();
         outBuffer.reset();
         sendBufferLH(outBuffer, outLen, dExchange, rkeys.front());
-        QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front());
+        QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
         rkeys.pop_front();
     }
 }
@@ -476,8 +479,10 @@ void ManagementAgent::clusterUpdate() {
     // Called on all cluster memebers when a new member joins a cluster.
     // Set clientWasAdded so that on the next periodicProcessing we will do 
     // a full update on all cluster members.
+    sys::Mutex::ScopedLock l(userLock);
+    moveNewObjectsLH();         // to be consistent with updater/updatee.
     clientWasAdded = true;
-    QPID_LOG(debug, "cluster update " << debugSnapshot());
+    QPID_LOG(debug, "Cluster member joined, " << debugSnapshot());
 }
 
 void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -509,7 +514,7 @@ void ManagementAgent::sendBufferLH(Buffe
                                    string   routingKey)
 {
     if (suppressed) {
-        QPID_LOG(trace, "Suppressing management message to " << routingKey);
+        QPID_LOG(debug, "Suppressing management message to " << routingKey);
         return;
     }
     if (exchange.get() == 0) return;
@@ -564,7 +569,7 @@ void ManagementAgent::sendBufferLH(const
     Variant::Map::const_iterator i;
 
     if (suppressed) {
-        QPID_LOG(trace, "Suppressing management message to " << routingKey);
+        QPID_LOG(debug, "Suppressing management message to " << routingKey);
         return;
     }
     if (exchange.get() == 0) return;
@@ -637,7 +642,7 @@ void ManagementAgent::periodicProcessing
 {
 #define BUFSIZE   65536
 #define HEADROOM  4096
-    QPID_LOG(trace, "Management agent periodic processing");
+    QPID_LOG(debug, "Management agent periodic processing");
     sys::Mutex::ScopedLock lock (userLock);
     char                msgChars[BUFSIZE];
     uint32_t            contentSize;
@@ -776,17 +781,26 @@ void ManagementAgent::periodicProcessing
                 send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
 
                 if (send_props && qmf1Support) {
+                    size_t pos = msgBuffer.getPosition();
                     encodeHeader(msgBuffer, 'c');
                     sBuf.clear();
                     object->writeProperties(sBuf);
                     msgBuffer.putRawData(sBuf);
+                    QPID_LOG(trace, "Changed V1 properties "
+                             << object->getObjectId().getV2Key()
+                             << " len=" << msgBuffer.getPosition()-pos);
                 }
 
                 if (send_stats && qmf1Support) {
+                    size_t pos = msgBuffer.getPosition();
                     encodeHeader(msgBuffer, 'i');
                     sBuf.clear();
                     object->writeStatistics(sBuf);
                     msgBuffer.putRawData(sBuf);
+                    QPID_LOG(trace, "Changed V1 statistics "
+                             << object->getObjectId().getV2Key()
+                             << " len=" << msgBuffer.getPosition()-pos);
+
                 }
 
                 if ((send_stats || send_props) && qmf2Support) {
@@ -805,6 +819,10 @@ void ManagementAgent::periodicProcessing
                     map_["_values"] = values;
                     list_.push_back(map_);
                     v2Objs++;
+                    QPID_LOG(trace, "Changed V2"
+                             << (send_stats? " statistics":"")
+                             << (send_props? " properties":"")
+                             << " map=" << map_);
                 }
 
                 if (send_props) pcount++;
@@ -826,7 +844,10 @@ void ManagementAgent::periodicProcessing
                     key << "console.obj.1.0." << packageName << "." << className;
                     msgBuffer.reset();
                     sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   // UNLOCKS USERLOCK
-                    QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << contentSize);
+                    QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
+                             << " props=" << pcount
+                             << " stats=" << scount
+                             << " len=" << contentSize);
                 }
             }
 
@@ -849,7 +870,10 @@ void ManagementAgent::periodicProcessing
                     headers["qmf.agent"] = name_address;
 
                     sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());  // UNLOCKS USERLOCK
-                    QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length());
+                    QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
+                             << " props=" << pcount
+                             << " stats=" << scount
+                             << " len=" << content.length());
                 }
             }
         }
@@ -877,15 +901,19 @@ void ManagementAgent::periodicProcessing
 
             for (DeletedObjectList::iterator lIter = mIter->second.begin();
                  lIter != mIter->second.end(); lIter++) {
-
+                std::string oid = (*lIter)->objectId;
                 if (!(*lIter)->encodedV1Config.empty()) {
                     encodeHeader(msgBuffer, 'c');
                     msgBuffer.putRawData((*lIter)->encodedV1Config);
+                    QPID_LOG(trace, "Deleting V1 properties " << oid
+                             << " len=" << (*lIter)->encodedV1Config.size());
                     v1Objs++;
                 }
                 if (!(*lIter)->encodedV1Inst.empty()) {
                     encodeHeader(msgBuffer, 'i');
                     msgBuffer.putRawData((*lIter)->encodedV1Inst);
+                    QPID_LOG(trace, "Deleting V1 statistics " << oid
+                             << " len=" <<  (*lIter)->encodedV1Inst.size());
                     v1Objs++;
                 }
                 if (v1Objs && msgBuffer.available() < HEADROOM) {
@@ -895,10 +923,12 @@ void ManagementAgent::periodicProcessing
                     key << "console.obj.1.0." << packageName << "." << className;
                     msgBuffer.reset();
                     sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   // UNLOCKS USERLOCK
-                    QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+                    QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
+                             << key.str() << " len=" << contentSize);
                 }
 
                 if (!(*lIter)->encodedV2.empty()) {
+                    QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
                     list_.push_back((*lIter)->encodedV2);
                     if (++v2Objs >= maxV2ReplyObjs) {
                         v2Objs = 0;
@@ -922,7 +952,7 @@ void ManagementAgent::periodicProcessing
                             headers["qmf.agent"] = name_address;
 
                             sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());  // UNLOCKS USERLOCK
-                            QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+                            QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
                         }
                     }
                 }
@@ -936,7 +966,7 @@ void ManagementAgent::periodicProcessing
                 key << "console.obj.1.0." << packageName << "." << className;
                 msgBuffer.reset();
                 sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   // UNLOCKS USERLOCK
-                QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+                QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
             }
 
             if (!list_.empty()) {
@@ -959,7 +989,7 @@ void ManagementAgent::periodicProcessing
                     headers["qmf.agent"] = name_address;
 
                     sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());  // UNLOCKS USERLOCK
-                    QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+                    QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
                 }
             }
         }  // end map
@@ -984,7 +1014,7 @@ void ManagementAgent::periodicProcessing
         msgBuffer.reset ();
         routingKey = "console.heartbeat.1.0";
         sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
-        QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
+        QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
     }
 
     if (qmf2Support) {
@@ -1013,7 +1043,7 @@ void ManagementAgent::periodicProcessing
         // time to prevent stale heartbeats from getting to the consoles.
         sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
 
-        QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+        QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
     }
     QPID_LOG(debug, "periodic update " << debugSnapshot());
 }
@@ -1073,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(
         uint32_t contentSize = msgBuffer.getPosition();
         msgBuffer.reset();
         sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
-        QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str());
+        QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
     }
 
     if (qmf2Support) {
@@ -1086,7 +1116,7 @@ void ManagementAgent::deleteObjectNowLH(
         string content;
         ListCodec::encode(list_, content);
         sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str());
-        QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str());
+        QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
     }
 }
 
@@ -1102,7 +1132,7 @@ void ManagementAgent::sendCommandComplet
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+    QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
              replyToKey << " seq=" << sequence);
 }
 
@@ -1127,7 +1157,7 @@ void ManagementAgent::sendExceptionLH(co
     MapCodec::encode(map, content);
     sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
 
-    QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
+    QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
 }
 
 bool ManagementAgent::dispatchCommand (Deliverable&      deliverable,
@@ -1221,7 +1251,7 @@ void ManagementAgent::handleMethodReques
     inBuffer.getShortString(methodName);
     inBuffer.getRawData(inArgs, inBuffer.available());
 
-    QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+    QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
              methodName << " replyTo=" << replyToKey);
 
     encodeHeader(outBuffer, 'm', sequence);
@@ -1232,7 +1262,7 @@ void ManagementAgent::handleMethodReques
         outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
         sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
+        QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
         return;
     }
 
@@ -1243,7 +1273,7 @@ void ManagementAgent::handleMethodReques
         outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
         sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+        QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
         return;
     }
 
@@ -1259,7 +1289,7 @@ void ManagementAgent::handleMethodReques
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
             sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+            QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
             return;
         }
     }
@@ -1291,7 +1321,7 @@ void ManagementAgent::handleMethodReques
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
     sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
 }
 
 
@@ -1374,7 +1404,7 @@ void ManagementAgent::handleMethodReques
 
     // invoke the method
 
-    QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+    QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
              << ":" << iter->second->getClassName() << " method=" <<
              methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
 
@@ -1402,7 +1432,7 @@ void ManagementAgent::handleMethodReques
 
     MapCodec::encode(outMap, content);
     sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
-    QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+    QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
 }
 
 
@@ -1411,7 +1441,7 @@ void ManagementAgent::handleBrokerReques
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey);
+    QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
 
     encodeHeader (outBuffer, 'b', sequence);
     uuid.encode  (outBuffer);
@@ -1419,12 +1449,12 @@ void ManagementAgent::handleBrokerReques
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
+    QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
 }
 
 void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)
 {
-    QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey);
+    QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
@@ -1440,7 +1470,7 @@ void ManagementAgent::handlePackageQuery
     if (outLen) {
         outBuffer.reset ();
         sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
+        QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
     }
 
     sendCommandCompleteLH(replyToKey, sequence);
@@ -1452,7 +1482,7 @@ void ManagementAgent::handlePackageIndLH
 
     inBuffer.getShortString(packageName);
 
-    QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
 
     findOrAddPackageLH(packageName);
 }
@@ -1463,7 +1493,7 @@ void ManagementAgent::handleClassQueryLH
 
     inBuffer.getShortString(packageName);
 
-    QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end())
@@ -1489,7 +1519,7 @@ void ManagementAgent::handleClassQueryLH
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
             sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-            QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
+            QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
                      "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
             classes.pop_front();
         }
@@ -1508,7 +1538,7 @@ void ManagementAgent::handleClassIndLH (
     inBuffer.getShortString(key.name);
     inBuffer.getBin128(key.hash);
 
-    QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+    QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
              "), replyTo=" << replyToKey);
 
     PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -1525,7 +1555,7 @@ void ManagementAgent::handleClassIndLH (
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
         sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+        QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
                  "), to=" << replyToKey << " seq=" << sequence);
 
         if (cIter != pIter->second.end())
@@ -1557,7 +1587,7 @@ void ManagementAgent::handleSchemaReques
     inBuffer.getShortString (packageName);
     key.decode(inBuffer);
 
-    QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+    QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
              "), replyTo=" << replyToKey << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
@@ -1575,7 +1605,7 @@ void ManagementAgent::handleSchemaReques
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
                 sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
             }
             else
                 sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
@@ -1598,7 +1628,7 @@ void ManagementAgent::handleSchemaRespon
     key.decode(inBuffer);
     inBuffer.restore();
 
-    QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+    QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end()) {
@@ -1622,7 +1652,7 @@ void ManagementAgent::handleSchemaRespon
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
                 sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
-                QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+                QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
                          " to=schema.class");
             }
         }
@@ -1702,7 +1732,7 @@ void ManagementAgent::handleAttachReques
     requestedBrokerBank = inBuffer.getLong();
     requestedAgentBank  = inBuffer.getLong();
 
-    QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+    QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
              " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
 
     assignedBank = assignBankLH(requestedAgentBank);
@@ -1722,7 +1752,7 @@ void ManagementAgent::handleAttachReques
     addObject (agent->mgmtObject, 0);
     remoteAgents[connectionRef] = agent;
 
-    QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+    QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
 
     // Send an Attach Response
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1734,7 +1764,7 @@ void ManagementAgent::handleAttachReques
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+    QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
              " to=" << replyToKey << " seq=" << sequence);
 }
 
@@ -1747,7 +1777,7 @@ void ManagementAgent::handleGetQueryLH(B
 
     ft.decode(inBuffer);
 
-    QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
+    QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
 
     value = ft.get("_class");
     if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1776,7 +1806,7 @@ void ManagementAgent::handleGetQueryLH(B
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
                 sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
             }
         }
         sendCommandCompleteLH(replyToKey, sequence);
@@ -1821,7 +1851,7 @@ void ManagementAgent::handleGetQueryLH(B
                         outLen = MA_BUFFER_SIZE - outBuffer.available ();
                         outBuffer.reset ();
                         sendBufferLH(outBuffer, outLen, dExchange, replyToKey);   // drops lock
-                        QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+                        QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
                         continue;  // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
                     }
                     encodeHeader(outBuffer, 'g', sequence);
@@ -1837,7 +1867,7 @@ void ManagementAgent::handleGetQueryLH(B
     if (outLen) {
         outBuffer.reset ();
         sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+        QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
     }
 
     sendCommandCompleteLH(replyToKey, sequence);
@@ -1853,7 +1883,7 @@ void ManagementAgent::handleGetQueryLH(c
     Variant::Map headers;
 
     MapCodec::decode(body, inMap);
-    QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+    QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
 
     headers["method"] = "response";
     headers["qmf.opcode"] = "_query_response";
@@ -1935,7 +1965,7 @@ void ManagementAgent::handleGetQueryLH(c
 
             ListCodec::encode(list_, content);
             sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-            QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
             return;
         }
     } else {
@@ -1989,12 +2019,12 @@ void ManagementAgent::handleGetQueryLH(c
             ListCodec::encode(_list.front().asList(), content);
             sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
             _list.pop_front();
-            QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
         }
         headers.erase("partial");
         ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
         sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-        QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
         return;
     }
 
@@ -2002,14 +2032,14 @@ void ManagementAgent::handleGetQueryLH(c
     string content;
     ListCodec::encode(Variant::List(), content);
     sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-    QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo);
+    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
 }
 
 
 void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
                                             const string& cid)
 {
-    QPID_LOG(trace, "RCVD AgentLocateRequest");
+    QPID_LOG(debug, "RCVD AgentLocateRequest");
 
     Variant::Map map;
     Variant::Map headers;
@@ -2028,7 +2058,7 @@ void ManagementAgent::handleLocateReques
     sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
     clientWasAdded = true;
 
-    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+    QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
 }
 
 
@@ -2171,7 +2201,7 @@ bool ManagementAgent::authorizeAgentMess
                 sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
             }
 
-            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+            QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
         }
 
         return false;
@@ -2269,7 +2299,7 @@ ManagementAgent::PackageMap::iterator Ma
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
-    QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package");
+    QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
 
     return result.first;
 }
@@ -2639,12 +2669,13 @@ void ManagementAgent::importAgents(qpid:
 }
 
 namespace {
-bool isNotDeleted(const ManagementObjectMap::value_type& value) {
-    return !value.second->isDeleted();
+bool isDeleted(const ManagementObjectMap::value_type& value) {
+    return value.second->isDeleted();
 }
 
-size_t countNotDeleted(const ManagementObjectMap& map) {
-    return std::count_if(map.begin(), map.end(), isNotDeleted);
+void summarizeMap(std::ostream& o, const char* name, const ManagementObjectMap& map) {
+    size_t deleted = std::count_if(map.begin(), map.end(), isDeleted);
+    o << map.size() << " " << name << " (" << deleted << " deleted), ";
 }
 
 void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
@@ -2657,13 +2688,18 @@ void dumpMap(std::ostream& o, const Mana
 
 string ManagementAgent::debugSnapshot() {
     ostringstream msg;
-    msg << " management snapshot:";
-    for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
-         i != remoteAgents.end(); ++i)
-        msg << " " << i->second->routingKey;
-    msg << " packages: " << packages.size();
-    msg << " objects: " << countNotDeleted(managementObjects);
-    msg << " new objects: " << countNotDeleted(newManagementObjects);
+    msg << " management snapshot: ";
+    if (!remoteAgents.empty()) {
+        msg <<  remoteAgents.size() << " agents(";
+        for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
+             i != remoteAgents.end(); ++i)
+            msg << " " << i->second->routingKey;
+        msg << "), ";
+    }
+    msg  << packages.size() << " packages, ";
+    summarizeMap(msg, "objects", managementObjects);
+    summarizeMap(msg, "new objects ", newManagementObjects);
+    msg << pendingDeletedObjs.size() << " pending deletes" ;
     return msg.str();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Fri Jan  7 16:32:34 2011
@@ -262,6 +262,7 @@ void ManagementObject::setUpdateTime()
 
 void ManagementObject::resourceDestroy()
 {
+    QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
     destroyTime = sys::Duration(sys::EPOCH, sys::now());
     deleted     = true;
 }

Added: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1056378&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Fri Jan  7 16:32:34 2011
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Functions for comparing broker log files, used by cluster_tests.py.
+
+import os, os.path, re, glob
+from itertools import izip
+
+def split_log(log):
+    """Split a broker log at checkpoints where a member joins.
+    Return the set of checkpoints discovered."""
+    checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:")
+    outfile = None
+    checkpoints = []
+    for l in open(log):
+        match = checkpoint_re.search(l)
+        if match:
+            checkpoint = match.groups()[0]
+            checkpoints.append(checkpoint)
+            if outfile: outfile.close()
+            outfile = open("%s.%s"%(log, checkpoint), 'w')
+
+        if outfile: outfile.write(l)
+    if outfile: outfile.close()
+    return checkpoints
+
+def filter_log(log):
+    """Filter the contents of a log file to remove data that is expected
+    to differ between brokers in a cluster. Filtered log contents between
+    the same checkpoints should match across the cluster."""
+    out = open("%s.filter"%(log), 'w')
+    for l in open(log):
+        # Lines to skip entirely
+        skip = "|".join([
+            'local connection',         # Only on local broker
+            'UPDATER|UPDATEE|OFFER',    # Ignore update process
+            'stall for update|unstall, ignore update|cancelled offer .* unstall',
+            'caught up',
+            'active for links|Passivating links|Activating links',
+            'info Connection.* connected to', # UpdateClient connection
+            'warning Broker closed connection: 200, OK',
+            'task late',
+            'task overran'
+            ])
+        if re.compile(skip).search(l): continue
+
+        # Regex to match a UUID
+        uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
+
+        # Regular expression substitutions to remove expected differences
+        for pattern,subst in [
+            (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', ''), # Remove timestamp
+            (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id
+            (r' local\)| shadow\)', ')'), # Remove local/shadow indication
+            (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready.
+            # System UUID
+            (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'),
+
+            # FIXME aconway 2010-12-20: substitutions to mask known problems
+            #(r' len=\d+', ' len=NN'),   # buffer lengths
+            #(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
+            #(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
+            ]: l = re.sub(pattern,subst,l)
+        out.write(l)
+    out.close()
+
+def verify_logs(logs):
+    """Compare log files from cluster brokers, verify that they correspond correctly."""
+    for l in glob.glob("*.log"): filter_log(l)
+    checkpoints = set()
+    for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l)))
+    errors=[]
+    for c in checkpoints:
+        fragments = glob.glob("*.filter.%s"%(c))
+        fragments.sort(reverse=True, key=os.path.getsize)
+        while len(fragments) >= 2:
+            a = fragments.pop(0)
+            b = fragments[0]
+            for ab in izip(open(a), open(b)):
+                if ab[0] != ab[1]:
+                    errors.append("\n    %s %s"%(a, b))
+                    break
+    if errors:
+        raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors))
+
+# Can be run as a script.
+if __name__ == "__main__":
+    verify_logs(glob.glob("*.log"))

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1056378&r1=1056377&r2=1056378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Jan  7 16:32:34 2011
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess
+import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
 from qpid import datatypes, messaging
 from qpid.brokertest import *
 from qpid.harness import Skipped
@@ -35,7 +35,7 @@ log = getLogger("qpid.cluster_tests")
 # a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
 # and EXPECT_EXIT_FAIL in some of the tests below.
 
-# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error
 # should give non-0 exit status.
 
 # Import scripts as modules
@@ -299,7 +299,10 @@ class LongTests(BrokerTest):
         for i in range(i, len(cluster)): cluster[i].kill()
 
     def test_management(self, args=[]):
-        """Stress test: Run management clients and other clients concurrently."""
+        """
+        Stress test: Run management clients and other clients concurrently
+        while killing and restarting brokers.
+        """
 
         class ClientLoop(StoppableThread):
             """Run a client executable in a loop."""
@@ -352,9 +355,9 @@ class LongTests(BrokerTest):
                 finally: self.lock.release()
                 StoppableThread.stop(self)
 
-        # def test_management
-        args += ["--mgmt-pub-interval", 1] # Publish management information every second.
-        # FIXME aconway 2010-12-15: extra debugging
+        # body of test_management()
+
+        args += ["--mgmt-pub-interval", 1]
         args += ["--log-enable=trace+:management"]
         # Use store if present.
         if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
@@ -403,6 +406,10 @@ class LongTests(BrokerTest):
             start_mclients(cluster[alive])
         for c in chain(mclients, *clients):
             c.stop()
+        # Verify that logs are consistent
+        # FIXME aconway 2010-12-21: this is currently expected to fail due to
+        # known bugs, see https://issues.apache.org/jira/browse/QPID-2982
+        self.assertRaises(Exception, cluster_test_logs.verify_logs, glob.glob("*.log"))
 
     def test_management_qmf2(self):
         self.test_management(args=["--mgmt-qmf2=yes"])
@@ -506,7 +513,7 @@ class StoreTests(BrokerTest):
         self.assertEqual(a.wait(), 0)
         self.assertEqual(c.wait(), 0)
         # Mix members from both shutdown events, they should fail
-        # FIXME aconway 2010-03-11: can't predict the exit status of these
+        # TODO aconway 2010-03-11: can't predict the exit status of these
         # as it depends on the order of delivery of initial-status messages.
         # See comment at top of this file.
         a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message