Return-Path: X-Original-To: apmail-qpid-users-archive@www.apache.org Delivered-To: apmail-qpid-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1696D91BB for ; Mon, 19 Mar 2012 19:09:35 +0000 (UTC) Received: (qmail 68723 invoked by uid 500); 19 Mar 2012 19:09:34 -0000 Delivered-To: apmail-qpid-users-archive@qpid.apache.org Received: (qmail 68691 invoked by uid 500); 19 Mar 2012 19:09:34 -0000 Mailing-List: contact users-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@qpid.apache.org Delivered-To: mailing list users@qpid.apache.org Received: (qmail 68681 invoked by uid 99); 19 Mar 2012 19:09:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Mar 2012 19:09:34 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of fraser.adams@blueyonder.co.uk designates 81.103.221.48 as permitted sender) Received: from [81.103.221.48] (HELO mtaout02-winn.ispmail.ntl.com) (81.103.221.48) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Mar 2012 19:09:25 +0000 Received: from know-smtpout-4.server.virginmedia.net ([62.254.123.1]) by mtaout02-winn.ispmail.ntl.com (InterMail vM.7.08.04.00 201-2186-134-20080326) with ESMTP id <20120319190904.XBKN20752.mtaout02-winn.ispmail.ntl.com@know-smtpout-4.server.virginmedia.net> for ; Mon, 19 Mar 2012 19:09:04 +0000 Received: from [82.33.36.91] (helo=[192.168.1.2]) by know-smtpout-4.server.virginmedia.net with esmtpa (Exim 4.63) (envelope-from ) id 1S9hx1-0006ug-Vj for users@qpid.apache.org; Mon, 19 Mar 2012 19:09:04 +0000 Message-ID: <4F67844B.9030802@blueyonder.co.uk> Date: Mon, 19 Mar 2012 19:08:59 +0000 From: Fraser Adams User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:10.0.2) Gecko/20120216 Thunderbird/10.0.2 MIME-Version: 1.0 To: users@qpid.apache.org Subject: Re: QMF Tutorial References: <4F636F19.6010004@blueyonder.co.uk> <4F63873F.6030004@blueyonder.co.uk> In-Reply-To: Content-Type: multipart/mixed; boundary="------------030505020903030505070601" X-Cloudmark-Analysis: v=1.1 cv=R50lirqlHffDPPkwUlkuVa99MrvKdVWo//yz83qex8g= c=1 sm=0 a=y6CmfMN_vBIA:10 a=3NElcqgl2aoA:10 a=mV9VRH-2AAAA:8 a=a5Gf7U6LAAAA:8 a=MUPhgxILzb5lEaRLKJYA:9 a=wyDOCCA_cnuB1OedZK0A:7 a=wPNLvfGTeEIA:10 a=zo41fJIH5R0A:10 a=yHIqe9kG5mgA:10 a=TzwRiS6bthiGZs7T:21 a=qWlnXYv-yHU9cvku:21 a=igVHesOy9PtdDhg8iQMA:9 a=IB9xudptXkUX46nE58UA:7 a=f4Owi-HEwLWYVcft:21 a=pFIvEpIJvwQPCHHK:21 a=r5U2ML-w3iEmAzD_RmwA:9 a=HpAAvcLHHh0Zw7uRqdWCyQ==:117 X-Virus-Checked: Checked by ClamAV on apache.org --------------030505020903030505070601 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Hi Davide. Re "Today I came across this: http://ci.apache.org/projects/qpid/apis/0.10/cpp/html/a00461.html and I wonder whether this API is currently actively developed or is deprecated. Examples on the Qpid distribution for Windows use this API and they seems to be doing exactly what I am looking for. " That's the one I was referring to when I said "there's also a C++ API for QMF2 but unfortunately that bears no relation whatsoever to the API specified off the wiki???" which is specified at https://cwiki.apache.org/qpid/qmfv2-project-page.html I'm afraid that I've got no idea as to whether this is in active development or not and is partly why I was saying earlier that QMF is rather disjoint. I'm pretty sure that that the C++ API does actually use the QMF2 *protocol* under the hood but clearly it bears little relation to the QMF2 API I've linked above. My Java QMF2 API implementation is a pretty much complete implementation of the QMF2 API including things like query subscriptions the C++ API only supports some features (the subscribe call is just a stub) and I *think* though I'm not massively familiar with that API that it only supports an asynchronous model so the getObjects() type calls that you may be familiar with from the QMF1 API aren't present. With respect to your core problem space are your "satellite components" messaging applications or are they brokers themselves, I'm guessing the former. You said earlier "but I guess I can achieve the same result with the QMF, getting notification on connect and disconnect events. " that should be possible with QMF but in basic terms that will only tell you the IP of the host so you'd need to know a priori what *should* be present to give an indication of whether it exists or not. In case it helps you in any way I've attached another python QMF1 application I put together, it's a connection audit mechanism. The idea with this is that I wanted to validate consumers against a whitelist without having to resort to full authentication so the QMF application checks connections and for each connection checks the queue associated with a subscription to that connection against a whitelist if the queue isn't in the whitelist it gets binding info etc. and logs the full details. You might be able to do something similar, for example checking connections to see if a certain satellite exists. If each satellite is listening on it's own subscription then you could interrogate the binding which might be a cleaner way of checking whether it exists than checking it's IP is a valid satellite. Another approach might be to take a really QMF like approach and have each satellite act as QMF Agents you could then use the QMF agent discovery to identify which Agents were registered and thus you were able to send messages to. Hope this is some help, best regards, Frase On 19/03/12 16:00, Davide Anastasia wrote: > Hi Fraser, > Thanks a lot for your help! > > Let me try to clarify what I mean for "family". Suppose you have a > component that acts like a router and series of satellite components > (which do pretty much the same job). For some particular reason, I want > to send a certain message to a certain satellite of this router, but I > want to know whether it exists or not. > > Today I came across this: > http://ci.apache.org/projects/qpid/apis/0.10/cpp/html/a00461.html > and I wonder whether this API is currently actively developed or is > deprecated. Examples on the Qpid distribution for Windows use this API > and they seems to be doing exactly what I am looking for. > > Thanks everybody, > Davide > > -----Original Message----- > From: Fraser Adams [mailto:fraser.adams@blueyonder.co.uk] > Sent: 16 March 2012 18:33 > To: users@qpid.apache.org > Subject: Re: QMF Tutorial > > Hi Davide, > I'm afraid that I don't quite understand what you're trying to achieve - > you might have to spell it out :-( > > One thing I forgot to add, if you look in the source tree in > qpid/specs/management-schema.xml that will tell you all of the > properties available for all of the broker management objects. > > The connection logger program that I pointed you at just might be able > to give you some pointers, what that does is to list information about > all connections, but it's quite a bit more tricksy than that. I was > interested in identifying whether connections were producers or > consumers. That's actually quite hard.... > > What I did was to get the connection, session and subscription objects > and for each connection work out how many sessions and for each session > work out how many subscriptions - it's hard because sessions are > associated with connections not the other way around and subscriptions > are associated with sessions. It's logical because of the multiplicity, > if it the associations were done the *convenient* way there would be a > 1..* multiplicity but the way it has been implemented in QMF it's a 1..1 > multiplicity. > > Anyway as my connections know the associated subscriptions I can find > the queue reference and from that find bindings - the "-q" option in my > program displays the queues, bindings and exchanges associated with a > connection kind of like a fancy qpid-config -b queues. > > At a guess by "log connections of components owning to a certain > "family" " you might be meaning log connections associated with a > specific subset of bindings - if so you should be able to cannibalise my > connection logger quite easily to do this. > > Hope this makes sense and points you in the right direction. > Best regards, > Frase > > On 16/03/12 17:58, Davide Anastasia wrote: >> Hi Fraser, >> Thanks for your quick reply! >> My primary language is C++ and I use the messaging API and QMF2. >> However I am not scared to prototype something in Python. >> >> What I am trying to do is to log connections of components owning to a >> certain "family", let's say all the components that can serve a >> certain type of request, so that a central "hub" knows how many >> potential consumers are available at every time. I thought it was the >> perfect scenario for a customized heartbeat, but I guess I can achieve >> the same result with the QMF, getting notification on connect and >> disconnect events. >> Am I going in the right direction? Does this make sense to you? >> >> Best, >> Davide >> >> -----Original Message----- >> From: Fraser Adams [mailto:fraser.adams@blueyonder.co.uk] >> Sent: 16 March 2012 16:49 >> To: users@qpid.apache.org >> Subject: Re: QMF Tutorial >> >> Hi Davide >> What language are you interested in? >> >> I'm afraid QMF is still a bit disjoint you might be aware that there >> are two variants: >> >> QMF1 which uses a compact binary protocol and the QMF1 API - that's >> what the bundled qpid tools such as qpid-config, qpid-stat etc. use. >> That has good python support, I *think* the C++ API works but the Java >> implementation is very broken >> >> QMF2 uses a protocol based on Map messages, that is *relatively* easy >> to drive just by using Map messages because it's primarily in the form >> of key/value pairs. The QMF2 API has a specification hanging off the >> qpid wiki QMF pages There's I believe a prototype python >> implementation and I've written a Java implementation that covers the >> full API there's also a C++ API for QMF2 but unfortunately that bears >> no relation whatsoever to the API specified off the wiki??? >> >> As I say it's rather disjoint :-( >> >> The main QMF page is here: >> https://cwiki.apache.org/qpid/qpid-management-framework.html >> >> A tutorial for the QMF1 python stuff is here: >> https://cwiki.apache.org/qpid/qmf-python-console-tutorial.html >> I recently wrote a connection logger application using this old API >> it's linked to this Jira and might give you some ideas >> https://issues.apache.org/jira/browse/QPID-3869 >> >> The QMF2 documentation lives off here: >> https://cwiki.apache.org/qpid/qmfv2-project-page.html >> >> The Java QMF2 stuff I did is linked off this Jira >> https://issues.apache.org/jira/browse/QPID-3675 >> >> >> HTH >> Frase >> >> >> >> On 16/03/12 16:15, Davide Anastasia wrote: >>> Hi All, >>> >>> Can anybody suggest a good tutorial on QMF? >>> >>> >>> >>> Best, >>> >>> >>> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For >> additional commands, e-mail: users-help@qpid.apache.org >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For >> additional commands, e-mail: users-help@qpid.apache.org >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional > commands, e-mail: users-help@qpid.apache.org > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org > For additional commands, e-mail: users-help@qpid.apache.org > --------------030505020903030505070601 Content-Type: text/plain; name="connection-audit" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="connection-audit" #!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # This application audits connections to one or more Qpid message brokers. # Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated import os import os.path import optparse from optparse import IndentedHelpFormatter import sys import socket from time import time, strftime, gmtime, localtime, sleep from qmf.console import Console, Session from xml.etree.ElementTree import ElementTree class AuditConsole(Console): # AuditConsole Constructor def __init__(self, whitelistFile): self.whitelistFile = whitelistFile self.whitelistLastModified = None # Declare dictionary to map queue name to queue objects self.queueNameMap = {} # Declare dictionary to map queue name to subscription objects self.queueNameSubscriptionMap = {} # Declare dictionaries to map object-ids to objects self.queueMap = {} self.exchangeMap = {} self.bindingMap = {} self.connectionMap = {} self.sessionMap = {} # Declare the sets to be used as the whitelists self.exchangeWhitelist = set() self.queueWhitelist = set() # Overridden method from Console # Handle property updates. This method registers or deletes the relevant objects in the relevant Map # These objects are used by the validateSubscription() in order to find out information about the # bindings and connections associated with a given queue in order to work out if it needs to be tested # against the whitelist. For example we don't validate queues bound to the management exchanges as # this would generate spurious results as most of these queues are temporary queues. def objectProps(self, broker, record): classKey = record.getClassKey() oid = record.getObjectId() deleted = record.getTimestamps()[2] > 0 # Note that queue objects are keyed by both queue name and Object ID if classKey.getClassName() == "queue": queueName = record.name if oid not in self.queueMap and not deleted: self.queueMap[oid] = record if queueName not in self.queueNameMap and not deleted: self.queueNameMap[queueName] = record # If the queue object has been deleted remove it from the map if deleted: if queueName in self.queueNameMap: self.queueNameMap.pop(queueName) if oid in self.queueMap: self.queueMap.pop(oid) if classKey.getClassName() == "exchange": if oid not in self.exchangeMap and not deleted: self.exchangeMap[oid] = record # If the exchange object has been deleted remove it from the map if deleted and oid in self.exchangeMap: self.exchangeMap.pop(oid) if classKey.getClassName() == "binding": if oid not in self.bindingMap and not deleted: self.bindingMap[oid] = record # If the binding object has been deleted remove it from the map if deleted and oid in self.bindingMap: self.bindingMap.pop(oid) if classKey.getClassName() == "connection": if oid not in self.connectionMap and not deleted: # Re-read the whitelist when a new connection occurs self.readWhitelist() self.connectionMap[oid] = record # If the connection object has been deleted remove it from the map if deleted and oid in self.connectionMap: self.connectionMap.pop(oid) if classKey.getClassName() == "session": if oid not in self.sessionMap and not deleted: self.sessionMap[oid] = record # If the session object has been deleted remove it from the map if deleted and oid in self.sessionMap: self.sessionMap.pop(oid) # Note that subscription objects are keyed by queue name NOT by Object ID if classKey.getClassName() == "subscription": # Find the queue associated with the subscription by looking its reference up in queueMap queue = self.queueMap.get(record.queueRef) if queue != None: queueName = queue.name if queueName not in self.queueNameSubscriptionMap and not deleted: self.queueNameSubscriptionMap[queueName] = record self.validateSubscription(queueName) # If the subscription object has been deleted remove it from the map if deleted and queueName in self.queueNameSubscriptionMap: self.queueNameSubscriptionMap.pop(queueName) # This method first checks if the whitelist file exists, if not it clears the sets used as whitelists # so that no whitelisting is applied. If the whitelist file does exist it is parsed by an ElementTree # we look for all exchange and queue elements and populate the respective whitelist sets with their # contents. Note that we check the whitelist file update time to avoid reading it if it hasn't been changed def readWhitelist(self): if os.path.exists(self.whitelistFile): mtime = os.path.getmtime(self.whitelistFile) if mtime != self.whitelistLastModified: self.whitelistLastModified = mtime self.exchangeWhitelist.clear() self.queueWhitelist.clear() tree = ElementTree() try: tree.parse(self.whitelistFile) exchanges = tree.findall("exchangeWhitelist/exchange") for i in exchanges: exchange = i.text if exchange == None: self.exchangeWhitelist.add("") else: self.exchangeWhitelist.add(exchange) queues = tree.findall("queueWhitelist/queue") for i in queues: queue = i.text self.queueWhitelist.add(queue) except Exception, e: # Failed to parse correctly print strftime("%c", localtime(time())), "WARN connection-audit:readWhitelist failed: %s - %s" % (e.__class__.__name__, e) else: # If whitelist file doesn't exist log a warning and clear the whitelists print strftime("%c", localtime(time())), "WARN connection-audit:readWhitelist %s doesn't exist" % self.whitelistFile sys.stdout.flush() self.exchangeWhitelist.clear() self.queueWhitelist.clear() # Given a queue name this method looks up the associated subscription, session and connection # objects and returns the connection def findConnection(self, queueName): subscription = self.queueNameSubscriptionMap.get(queueName) if subscription != None: session = self.sessionMap.get(subscription.sessionRef) if session != None: connection = self.connectionMap.get(session.connectionRef) if connection != None: return connection # This method finds bindings that match a given queue name, if one is found the associated exchange is # recovered and then a call to validateQueue() is made def validateSubscription(self, queueName): for binding in self.bindingMap.values(): queue = self.queueMap.get(binding.queueRef) if queue != None and queue.name == queueName: exchange = self.exchangeMap.get(binding.exchangeRef) if exchange != None: self.validateQueue(queueName, exchange, binding) # This method validates the specified queue by comparing the queue name and associated exchange name # against the whitelist sets. If the exchange name or the queue name is in the whitelist then we do # nothing, but if not we generate an alert message. def validateQueue(self, queueName, exchange, binding): exchangeName = exchange.name if exchangeName in self.exchangeWhitelist: return if queueName in self.queueWhitelist: return connection = self.findConnection(queueName) address = connection.address ctime = connection.getTimestamps()[1]/1000000000 if exchangeName == "": exchangeName = "''" if binding.arguments: print strftime("%c", localtime(time())), "ALERT connection-audit:validateQueue validation failed for queue: %s with binding[%s] => %s %s from address: %s with connection timestamp" % (queueName, binding.bindingKey, exchangeName, binding.arguments, address), strftime("%c", localtime(ctime)) else: print strftime("%c", localtime(time())), "ALERT connection-audit:validateQueue validation failed for queue: %s with binding[%s] => %s from address: %s with connection timestamp" % (queueName, binding.bindingKey, exchangeName, address), strftime("%c", localtime(ctime)) sys.stdout.flush() # Overridden method from Console # def event(self, broker, event): # if event.classKey.getClassName() == "subscribe": # Overridden method from Console def brokerConnected(self, broker): # Log brokerConnected event print strftime("%c", localtime(time())), "NOTIC connection-audit:brokerConnected broker=%s" % broker.getUrl() sys.stdout.flush() # Overridden method from Console def brokerConnectionFailed(self, broker): # Log brokerConnectionFailed event print strftime("%c", localtime(time())), "NOTIC connection-audit:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc)) sys.stdout.flush() # Overridden method from Console def brokerDisconnected(self, broker): # Clear state as it'll get resent when the broker reconnects self.queueNameMap.clear() self.queueNameSubscriptionMap.clear() self.queueMap.clear() self.exchangeMap.clear() self.bindingMap.clear() self.connectionMap.clear() self.sessionMap.clear() # Log brokerDisconnected event print strftime("%c", localtime(time())), "NOTIC connection-audit:brokerDisconnected broker=%s" % broker.getUrl() sys.stdout.flush() class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings """ def format_usage(self, usage): return usage def format_description(self, description): if description: return description + "\n" else: return "" _usage = "%prog [options] [broker-addr]..." _description = \ """ Audits connections to one or more Qpid message brokers. Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated If no broker-addr is supplied, %prog connects to 'localhost:5672'. [broker-addr] syntax: [username/password@] hostname ip-address [:] Examples: $ %prog localhost:5672 $ %prog 10.1.1.7:10000 $ %prog guest/guest@broker-host:10000 """ def main(argv=None): p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter()) p.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") p.add_option("--whitelist", action="store", type="string", default="./whitelist.xml", metavar="", help="The fully qualified name of the whitelist XML file, default is ./whitelist.xml") options, arguments = p.parse_args(args=argv) if len(arguments) == 0: arguments.append("localhost") console = AuditConsole(options.whitelist) session = Session(console, rcvObjects=True, rcvEvents=True, userBindings=True, manageConnections=True) # Register to receive updates for broker:queue objects. session.bindClass("org.apache.qpid.broker", "queue") # Register to receive updates for broker:exchange objects. session.bindClass("org.apache.qpid.broker", "exchange") # Register to receive updates for broker:binding objects. session.bindClass("org.apache.qpid.broker", "binding") # Register to receive updates for broker:connection objects. session.bindClass("org.apache.qpid.broker", "connection") # Register to receive updates for broker:session objects. session.bindClass("org.apache.qpid.broker", "session") # Register to receive updates for broker:subscription objects. session.bindClass("org.apache.qpid.broker", "subscription") # Register to receive updates for broker:subscribe event. # session.bindEvent("org.apache.qpid.broker", "subscribe") brokers = [] try: try: for host in arguments: brokers.append(session.addBroker(host, None, options.sasl_mechanism)) while (True): sleep(10) except KeyboardInterrupt: print return 0 except Exception, e: print "Failed: %s - %s" % (e.__class__.__name__, e) return 1 finally: while len(brokers): b = brokers.pop() session.delBroker(b) if __name__ == '__main__': sys.exit(main()) --------------030505020903030505070601 Content-Type: text/xml; name="whitelist.xml" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="whitelist.xml" qmf.default.topic qmf.default.direct qpid.management amq.direct testqueue --------------030505020903030505070601 Content-Type: text/plain; charset=us-ascii --------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional commands, e-mail: users-help@qpid.apache.org --------------030505020903030505070601--