Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 95887 invoked from network); 4 Nov 2008 23:02:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Nov 2008 23:02:49 -0000 Received: (qmail 93014 invoked by uid 500); 4 Nov 2008 23:02:55 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 92976 invoked by uid 500); 4 Nov 2008 23:02:55 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 92967 invoked by uid 99); 4 Nov 2008 23:02:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Nov 2008 15:02:55 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Nov 2008 23:01:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4D38F2388879; Tue, 4 Nov 2008 15:01:58 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r711458 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/agent/ManagementAgentImpl.cpp cpp/src/qpid/management/ManagementBroker.cpp python/qpid/qmfconsole.py Date: Tue, 04 Nov 2008 23:01:57 -0000 To: qpid-commits@incubator.apache.org From: tross@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081104230158.4D38F2388879@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tross Date: Tue Nov 4 15:01:57 2008 New Revision: 711458 URL: http://svn.apache.org/viewvc?rev=711458&view=rev Log: Added bank numbers to the routing key of a QMF heartbeat message. This is used by the console to identify which agent sent the indication. Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=711458&r1=711457&r2=711458&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue Nov 4 15:01:57 2008 @@ -22,7 +22,6 @@ #include "qpid/management/ManagementObject.h" #include "ManagementAgentImpl.h" #include -#include #include #include #include @@ -597,11 +596,12 @@ Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); + stringstream key; + key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; contentSize = BUFSIZE - msgBuffer.available(); msgBuffer.reset(); - routingKey = "console.heartbeat"; - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); } moveNewObjectsLH(); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=711458&r1=711457&r2=711458&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue Nov 4 15:01:57 2008 @@ -351,7 +351,7 @@ contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.heartbeat"; + routingKey = "console.heartbeat.1.0"; sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=711458&r1=711457&r2=711458&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Tue Nov 4 15:01:57 2008 @@ -489,10 +489,21 @@ if self.console: self.console.methodResponse(broker, seq, result) - def _handleHeartbeatInd(self, broker, codec, seq): + def _handleHeartbeatInd(self, broker, codec, seq, msg): + brokerBank = 1 + agentBank = 0 + dp = msg.get("delivery_properties") + if dp: + key = dp["routing_key"] + keyElements = key.split(".") + if len(keyElements) == 4: + brokerBank = int(keyElements[2]) + agentBank = int(keyElements[3]) + + agent = broker.getAgent(brokerBank, agentBank) timestamp = codec.read_uint64() if self.console != None: - self.console.heartbeat(None, timestamp) + self.console.heartbeat(agent, timestamp) def _handleEventInd(self, broker, codec, seq): if self.console != None: @@ -1086,7 +1097,7 @@ self.authUser = authUser self.authPass = authPass self.agents = {} - self.agents[0] = Agent(self, "1.0", "BrokerAgent") + self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False @@ -1112,6 +1123,12 @@ def getBrokerBank(self): return 1 + def getAgent(self, brokerBank, agentBank): + bankKey = "%d.%d" % (brokerBank, agentBank) + if bankKey in self.agents: + return self.agents[bankKey] + return None + def getSessionId(self): """ Get the identifier of the AMQP session to the broker """ return self.amqpSessionId @@ -1287,7 +1304,7 @@ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) elif opcode == 'e': self.session._handleEventInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)