qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r711458 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/agent/ManagementAgentImpl.cpp cpp/src/qpid/management/ManagementBroker.cpp python/qpid/qmfconsole.py
Date Tue, 04 Nov 2008 23:01:57 GMT
Author: tross
Date: Tue Nov  4 15:01:57 2008
New Revision: 711458

URL: http://svn.apache.org/viewvc?rev=711458&view=rev
Log:

Added bank numbers to the routing key of a QMF heartbeat message.
This is used by the console to identify which agent sent the indication.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=711458&r1=711457&r2=711458&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue Nov  4 15:01:57
2008
@@ -22,7 +22,6 @@
 #include "qpid/management/ManagementObject.h"
 #include "ManagementAgentImpl.h"
 #include <list>
-#include <unistd.h>
 #include <string.h>
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -597,11 +596,12 @@
         Buffer msgBuffer(msgChars, BUFSIZE);
         encodeHeader(msgBuffer, 'h');
         msgBuffer.putLongLong(uint64_t(Duration(now())));
+        stringstream key;
+        key << "console.heartbeat." << assignedBrokerBank << "." <<
assignedAgentBank;
 
         contentSize = BUFSIZE - msgBuffer.available();
         msgBuffer.reset();
-        routingKey = "console.heartbeat";
-        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
+        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
     }
 
     moveNewObjectsLH();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=711458&r1=711457&r2=711458&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue Nov  4 15:01:57
2008
@@ -351,7 +351,7 @@
 
         contentSize = BUFSIZE - msgBuffer.available ();
         msgBuffer.reset ();
-        routingKey = "console.heartbeat";
+        routingKey = "console.heartbeat.1.0";
         sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
     }
 

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=711458&r1=711457&r2=711458&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Tue Nov  4 15:01:57 2008
@@ -489,10 +489,21 @@
       if self.console:
         self.console.methodResponse(broker, seq, result)
 
-  def _handleHeartbeatInd(self, broker, codec, seq):
+  def _handleHeartbeatInd(self, broker, codec, seq, msg):
+    brokerBank = 1
+    agentBank = 0
+    dp = msg.get("delivery_properties")
+    if dp:
+      key = dp["routing_key"]
+      keyElements = key.split(".")
+      if len(keyElements) == 4:
+        brokerBank = int(keyElements[2])
+        agentBank = int(keyElements[3])
+
+    agent = broker.getAgent(brokerBank, agentBank)
     timestamp = codec.read_uint64()
     if self.console != None:
-      self.console.heartbeat(None, timestamp)
+      self.console.heartbeat(agent, timestamp)
 
   def _handleEventInd(self, broker, codec, seq):
     if self.console != None:
@@ -1086,7 +1097,7 @@
     self.authUser = authUser
     self.authPass = authPass
     self.agents   = {}
-    self.agents[0] = Agent(self, "1.0", "BrokerAgent")
+    self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
     self.topicBound = False
     self.cv = Condition()
     self.syncInFlight = False
@@ -1112,6 +1123,12 @@
   def getBrokerBank(self):
     return 1
 
+  def getAgent(self, brokerBank, agentBank):
+    bankKey = "%d.%d" % (brokerBank, agentBank)
+    if bankKey in self.agents:
+      return self.agents[bankKey]
+    return None
+
   def getSessionId(self):
     """ Get the identifier of the AMQP session to the broker """
     return self.amqpSessionId
@@ -1287,7 +1304,7 @@
       elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
       elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
       elif opcode == 'm': self.session._handleMethodResp      (self, codec, seq)
-      elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq)
+      elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq, msg)
       elif opcode == 'e': self.session._handleEventInd        (self, codec, seq)
       elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq)
       elif opcode == 'c': self.session._handleContentInd      (self, codec, seq, prop=True)



Mime
View raw message