qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r1056112 - in /qpid/trunk/qpid: cpp/src/qpid/agent/ManagementAgentImpl.cpp cpp/src/qpid/management/ManagementAgent.cpp extras/qmf/src/py/qmf/console.py
Date Thu, 06 Jan 2011 23:04:35 GMT
Author: tross
Date: Thu Jan  6 23:04:34 2011
New Revision: 1056112

URL: http://svn.apache.org/viewvc?rev=1056112&view=rev
Log:
Changes to QMFv2 formats to make the agents and consoles consistent:
  1) Events in _data_indication messages are carried as lists of events (consistent with object
data)
  2) Built-in attributes for agents (in _heartbeaet_indication) all start with '_' to
     differentiate them from user-defined attributes.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1056112&r1=1056111&r2=1056112&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Jan  6 23:04:34 2011
@@ -339,8 +339,10 @@ void ManagementAgentImpl::raiseEvent(con
     headers["qmf.content"] = "_event";
     headers["qmf.agent"] = name_address;
 
-    MapCodec::encode(map_, content);
-    connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str());
+    Variant::List list;
+    list.push_back(map_);
+    ListCodec::encode(list, content);
+    connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
 }
 
 uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -1165,10 +1167,10 @@ void ManagementAgentImpl::periodicProces
 void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map)
 {
     map["_values"] = attrMap;
-    map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
-    map["_values"].asMap()["heartbeat_interval"] = interval;
-    map["_values"].asMap()["epoch"] = bootSequence;
-    map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp);
+    map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+    map["_values"].asMap()["_heartbeat_interval"] = interval;
+    map["_values"].asMap()["_epoch"] = bootSequence;
+    map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp);
 }
 
 void ManagementAgentImpl::ConnectionThread::run()

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1056112&r1=1056111&r2=1056112&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Jan  6 23:04:34 2011
@@ -404,8 +404,10 @@ void ManagementAgent::raiseEvent(const M
 
 
         string content;
-        MapCodec::encode(map_, content);
-        sendBufferLH(content, "", headers, "amqp/map", v2Topic, key.str());
+        Variant::List list_;
+        list_.push_back(map_);
+        ListCodec::encode(list_, content);
+        sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
         QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() <<
"." << event.getEventName());
     }
 
@@ -1000,9 +1002,9 @@ void ManagementAgent::periodicProcessing
         headers["qmf.agent"] = name_address;
 
         map["_values"] = attrMap;
-        map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
-        map["_values"].asMap()["heartbeat_interval"] = interval;
-        map["_values"].asMap()["epoch"] = bootSequence;
+        map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
+        map["_values"].asMap()["_heartbeat_interval"] = interval;
+        map["_values"].asMap()["_epoch"] = bootSequence;
 
         string content;
         MapCodec::encode(map, content);
@@ -2017,9 +2019,9 @@ void ManagementAgent::handleLocateReques
     headers["qmf.agent"] = name_address;
 
     map["_values"] = attrMap;
-    map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
-    map["_values"].asMap()["heartbeat_interval"] = interval;
-    map["_values"].asMap()["epoch"] = bootSequence;
+    map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
+    map["_values"].asMap()["_heartbeat_interval"] = interval;
+    map["_values"].asMap()["_epoch"] = bootSequence;
 
     string content;
     MapCodec::encode(map, content);

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=1056112&r1=1056111&r2=1056112&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Thu Jan  6 23:04:34 2011
@@ -1206,11 +1206,11 @@ class Session:
     try:
       agentName = ah["qmf.agent"]
       values = content["_values"]
-      timestamp = values["timestamp"]
-      interval = values["heartbeat_interval"]
+      timestamp = values["_timestamp"]
+      interval = values["_heartbeat_interval"]
       epoch = 0
-      if 'epoch' in values:
-        epoch = values['epoch']
+      if '_epoch' in values:
+        epoch = values['_epoch']
     except Exception,e:
       return
 
@@ -1239,7 +1239,7 @@ class Session:
       agent.touch()
     if self.rcvHeartbeats and self.console and agent:
       self._heartbeatCallback(agent, timestamp)
-    agent.update_schema_timestamp(values.get("schema_timestamp", 0))
+    agent.update_schema_timestamp(values.get("_schema_updated", 0))
 
 
   def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
@@ -2573,7 +2573,7 @@ class Broker(Thread):
       for agent in to_notify:
         self.session._delAgentCallback(agent)
 
-  def _v2SendAgentLocate(self, predicate={}):
+  def _v2SendAgentLocate(self, predicate=[]):
     """
     Broadcast an agent-locate request to cause all agents in the domain to tell us who they
are.
     """
@@ -2581,13 +2581,13 @@ class Broker(Thread):
     dp = self.amqpSession.delivery_properties()
     dp.routing_key = "console.request.agent_locate"
     mp = self.amqpSession.message_properties()
-    mp.content_type = "amqp/map"
+    mp.content_type = "amqp/list"
     mp.user_id = self.authUser
     mp.app_id = "qmf2"
     mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue)
     mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
     sendCodec = Codec()
-    sendCodec.write_map(predicate)
+    sendCodec.write_list(predicate)
     msg = Message(dp, mp, sendCodec.encoded)
     self._send(msg, "qmf.default.topic")
 
@@ -2855,7 +2855,7 @@ class Broker(Thread):
             content = None
         else:
           content = None
-  
+
         if content != None:
           ##
           ## Directly handle agent heartbeats and agent locate responses as these are broker-scope
(they are
@@ -3368,6 +3368,9 @@ class Agent:
     Handle a QMFv2 data indication from the agent.  Note: called from context
     of the Broker thread.
     """
+    if content.__class__ != list:
+      return
+
     if mp.correlation_id:
       try:
         self.lock.acquire()
@@ -3384,8 +3387,6 @@ class Agent:
     if "qmf.content" in ah:
       kind = ah["qmf.content"]
     if kind == "_data":
-      if content.__class__ != list:
-        return
       for omap in content:
         context.addV2QueryResult(omap)
       context.processV2Data()
@@ -3393,14 +3394,15 @@ class Agent:
         context.signal()
 
     elif kind == "_event":
-      event = Event(self, v2Map=content)
-      if event.classKey is None or event.schema:
-        # schema optional or present
-        context.doEvent(event)
-      else:
-        # schema not optional and not present
-        if context.addPendingEvent(event):
-          self._v2SendSchemaRequest(event.classKey)
+      for omap in content:
+        event = Event(self, v2Map=omap)
+        if event.classKey is None or event.schema:
+          # schema optional or present
+          context.doEvent(event)
+        else:
+          # schema not optional and not present
+          if context.addPendingEvent(event):
+            self._v2SendSchemaRequest(event.classKey)
 
     elif kind == "_schema_id":
       for sid in content:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message