Return-Path: X-Original-To: apmail-qpid-users-archive@www.apache.org Delivered-To: apmail-qpid-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF3CD9C1B for ; Sun, 9 Oct 2011 15:33:32 +0000 (UTC) Received: (qmail 54596 invoked by uid 500); 9 Oct 2011 15:33:32 -0000 Delivered-To: apmail-qpid-users-archive@qpid.apache.org Received: (qmail 54561 invoked by uid 500); 9 Oct 2011 15:33:32 -0000 Mailing-List: contact users-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@qpid.apache.org Delivered-To: mailing list users@qpid.apache.org Received: (qmail 54552 invoked by uid 99); 9 Oct 2011 15:33:32 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Oct 2011 15:33:32 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of fraser.adams@blueyonder.co.uk designates 81.103.221.48 as permitted sender) Received: from [81.103.221.48] (HELO mtaout02-winn.ispmail.ntl.com) (81.103.221.48) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Oct 2011 15:33:23 +0000 Received: from know-smtpout-4.server.virginmedia.net ([62.254.123.2]) by mtaout02-winn.ispmail.ntl.com (InterMail vM.7.08.04.00 201-2186-134-20080326) with ESMTP id <20111009153303.GVOK21463.mtaout02-winn.ispmail.ntl.com@know-smtpout-4.server.virginmedia.net> for ; Sun, 9 Oct 2011 16:33:03 +0100 Received: from [82.33.36.91] (helo=[192.168.1.4]) by know-smtpout-4.server.virginmedia.net with esmtpa (Exim 4.63) (envelope-from ) id 1RCvN8-0006iD-VU for users@qpid.apache.org; Sun, 09 Oct 2011 16:33:03 +0100 Message-ID: <4E91BEAF.3040406@blueyonder.co.uk> Date: Sun, 09 Oct 2011 16:33:03 +0100 From: Fraser Adams User-Agent: Thunderbird 2.0.0.24 (X11/20101027) MIME-Version: 1.0 To: users@qpid.apache.org Subject: Re: Is it possible to set authentication to only authenticate consumers? Content-Type: multipart/mixed; boundary="------------020607020802060703000004" X-Cloudmark-Analysis: v=1.1 cv=R50lirqlHffDPPkwUlkuVa99MrvKdVWo//yz83qex8g= c=1 sm=0 a=0ZzmyYADvgoA:10 a=Q0O5IUmHtJgA:10 a=3NElcqgl2aoA:10 a=QePl2w0oZMtKzKdE6icA:9 a=zKEn74v8H9iHENDsuVoA:7 a=wPNLvfGTeEIA:10 a=mV9VRH-2AAAA:8 a=igVHesOy9PtdDhg8iQMA:9 a=IB9xudptXkUX46nE58UA:7 a=f4Owi-HEwLWYVcft:21 a=pFIvEpIJvwQPCHHK:21 a=r5U2ML-w3iEmAzD_RmwA:9 a=HpAAvcLHHh0Zw7uRqdWCyQ==:117 X-Virus-Checked: Checked by ClamAV on apache.org --------------020607020802060703000004 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Gordon Sim wrote: > On 10/07/2011 05:50 PM, Fraser Adams wrote: >> What I'd quite like to be able to do is to log, but not deny if a queue >> is created that's not one of a named set. I'm suspecting that I can't do >> that with acl and I might have to write a QMF client to do that. > > I think you are right, QMF may be the simplest way to go. You can > quite easily get events sent out for queue creation and could then > check the name against an expected set and log any deviation. So I gave this a go. This is my very first python program :-) A bit of "copy and paste" reuse from qpid-printevents and some help from Mr Google. It seems to work pretty well - I added some exchangeName whitelisting to avoid triggering on the QMF exchanges. As it's based on qpid-printevents it's QMF1 based which I'm less familiar with than QMF2, but as qpid-printevents had all the broker reconnection logic in place I thought I'd stick with that for now. It wasn't quite as easy as just triggering on a couple of events as I wanted to log the IP address of the connection and to whitelist against exchangeNames, so I needed to trigger on quite a few object properties. What do you reckon - useful? As a slight aside... Gordon are there any plans to update qpid-config on an official release with your patch that displays the binding.arguments if they exist and so make it useful for headers exchange bindings. I've got my own patched version, but I'd rather use a vanilla version from a release. Similarly qpid-route had a test in place (at the start of the addLink() method) to prevent one linking a broker to itself, things work fine if I comment out the test. I can't see a good reason for the test - or log a warning rather than throw an exception. Frase --------------020607020802060703000004 Content-Type: text/plain; name="connection-audit" Content-Transfer-Encoding: 7bit Content-Disposition: inline; filename="connection-audit" #!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # This application audits connections to one or more Qpid message brokers. # Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated import os import os.path import optparse from optparse import IndentedHelpFormatter import sys import socket from time import time, strftime, gmtime, localtime, sleep from qmf.console import Console, Session from xml.etree.ElementTree import ElementTree class AuditConsole(Console): # AuditConsole Constructor def __init__(self, whitelistFile): self.whitelistFile = whitelistFile self.whitelistLastModified = None # Declare dictionary to map queue name to queue objects self.queueNameMap = {} # Declare dictionary to map queue name to subscription objects self.queueNameSubscriptionMap = {} # Declare dictionaries to map object-ids to objects self.queueMap = {} self.exchangeMap = {} self.bindingMap = {} self.connectionMap = {} self.sessionMap = {} # Declare the sets to be used as the whitelists self.exchangeWhitelist = set() self.queueWhitelist = set() # Overridden method from Console # Handle property updates. This method registers or deletes the relevant objects in the relevant Map # These objects are used by the validateSubscription() in order to find out information about the # bindings and connections associated with a given queue in order to work out if it needs to be tested # against the whitelist. For example we don't validate queues bound to the management exchanges as # this would generate spurious results as most of these queues are temporary queues. def objectProps(self, broker, record): classKey = record.getClassKey() oid = record.getObjectId() deleted = record.getTimestamps()[2] > 0 # Note that queue objects are keyed by both queue name and Object ID if classKey.getClassName() == "queue": queueName = record.name if oid not in self.queueMap and not deleted: self.queueMap[oid] = record if queueName not in self.queueNameMap and not deleted: self.queueNameMap[queueName] = record # If the queue object has been deleted remove it from the map if deleted: if queueName in self.queueNameMap: self.queueNameMap.pop(queueName) if oid in self.queueMap: self.queueMap.pop(oid) if classKey.getClassName() == "exchange": if oid not in self.exchangeMap and not deleted: self.exchangeMap[oid] = record # If the exchange object has been deleted remove it from the map if deleted and oid in self.exchangeMap: self.exchangeMap.pop(oid) if classKey.getClassName() == "binding": if oid not in self.bindingMap and not deleted: self.bindingMap[oid] = record # If the binding object has been deleted remove it from the map if deleted and oid in self.bindingMap: self.bindingMap.pop(oid) if classKey.getClassName() == "connection": if oid not in self.connectionMap and not deleted: # Re-read the whitelist when a new connection occurs self.readWhitelist() self.connectionMap[oid] = record # If the connection object has been deleted remove it from the map if deleted and oid in self.connectionMap: self.connectionMap.pop(oid) if classKey.getClassName() == "session": if oid not in self.sessionMap and not deleted: self.sessionMap[oid] = record # If the session object has been deleted remove it from the map if deleted and oid in self.sessionMap: self.sessionMap.pop(oid) # Note that subscription objects are keyed by queue name NOT by Object ID if classKey.getClassName() == "subscription": # Find the queue associated with the subscription by looking its reference up in queueMap queue = self.queueMap.get(record.queueRef) if queue != None: queueName = queue.name if queueName not in self.queueNameSubscriptionMap and not deleted: self.queueNameSubscriptionMap[queueName] = record self.validateSubscription(queueName) # If the subscription object has been deleted remove it from the map if deleted and queueName in self.queueNameSubscriptionMap: self.queueNameSubscriptionMap.pop(queueName) # This method first checks if the whitelist file exists, if not it clears the sets used as whitelists # so that no whitelisting is applied. If the whitelist file does exist it is parsed by an ElementTree # we look for all exchange and queue elements and populate the respective whitelist sets with their # contents. Note that we check the whitelist file update time to avoid reading it if it hasn't been changed def readWhitelist(self): if os.path.exists(self.whitelistFile): mtime = os.path.getmtime(self.whitelistFile) if mtime != self.whitelistLastModified: self.whitelistLastModified = mtime self.exchangeWhitelist.clear() self.queueWhitelist.clear() tree = ElementTree() try: tree.parse(self.whitelistFile) exchanges = tree.findall("exchangeWhitelist/exchange") for i in exchanges: exchange = i.text if exchange == None: self.exchangeWhitelist.add("") else: self.exchangeWhitelist.add(exchange) queues = tree.findall("queueWhitelist/queue") for i in queues: queue = i.text self.queueWhitelist.add(queue) except Exception, e: # Failed to parse correctly print strftime("%c", localtime(time())), "WARN connection-audit:readWhitelist failed: %s - %s" % (e.__class__.__name__, e) else: # If whitelist file doesn't exist log a warning and clear the whitelists print strftime("%c", localtime(time())), "WARN connection-audit:readWhitelist %s doesn't exist" % self.whitelistFile sys.stdout.flush() self.exchangeWhitelist.clear() self.queueWhitelist.clear() # Given a queue name this method looks up the associated subscription, session and connection # objects and returns the connection def findConnection(self, queueName): subscription = self.queueNameSubscriptionMap.get(queueName) if subscription != None: session = self.sessionMap.get(subscription.sessionRef) if session != None: connection = self.connectionMap.get(session.connectionRef) if connection != None: return connection # This method finds bindings that match a given queue name, if one is found the associated exchange is # recovered and then a call to validateQueue() is made def validateSubscription(self, queueName): for binding in self.bindingMap.values(): queue = self.queueMap.get(binding.queueRef) if queue != None and queue.name == queueName: exchange = self.exchangeMap.get(binding.exchangeRef) if exchange != None: self.validateQueue(queueName, exchange, binding) # This method validates the specified queue by comparing the queue name and associated exchange name # against the whitelist sets. If the exchange name or the queue name is in the whitelist then we do # nothing, but if not we generate an alert message. def validateQueue(self, queueName, exchange, binding): exchangeName = exchange.name if exchangeName in self.exchangeWhitelist: return if queueName in self.queueWhitelist: return connection = self.findConnection(queueName) address = connection.address ctime = connection.getTimestamps()[1]/1000000000 if exchangeName == "": exchangeName = "''" if binding.arguments: print strftime("%c", localtime(time())), "ALERT connection-audit:validateQueue validation failed for queue: %s with binding[%s] => %s %s from address: %s with connection timestamp" % (queueName, binding.bindingKey, exchangeName, binding.arguments, address), strftime("%c", localtime(ctime)) else: print strftime("%c", localtime(time())), "ALERT connection-audit:validateQueue validation failed for queue: %s with binding[%s] => %s from address: %s with connection timestamp" % (queueName, binding.bindingKey, exchangeName, address), strftime("%c", localtime(ctime)) sys.stdout.flush() # Overridden method from Console # def event(self, broker, event): # if event.classKey.getClassName() == "subscribe": # Overridden method from Console def brokerConnected(self, broker): # Log brokerConnected event print strftime("%c", localtime(time())), "NOTIC connection-audit:brokerConnected broker=%s" % broker.getUrl() sys.stdout.flush() # Overridden method from Console def brokerConnectionFailed(self, broker): # Log brokerConnectionFailed event print strftime("%c", localtime(time())), "NOTIC connection-audit:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc)) sys.stdout.flush() # Overridden method from Console def brokerDisconnected(self, broker): # Clear state as it'll get resent when the broker reconnects self.queueNameMap.clear() self.queueNameSubscriptionMap.clear() self.queueMap.clear() self.exchangeMap.clear() self.bindingMap.clear() self.connectionMap.clear() self.sessionMap.clear() # Log brokerDisconnected event print strftime("%c", localtime(time())), "NOTIC connection-audit:brokerDisconnected broker=%s" % broker.getUrl() sys.stdout.flush() class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings """ def format_usage(self, usage): return usage def format_description(self, description): if description: return description + "\n" else: return "" _usage = "%prog [options] [broker-addr]..." _description = \ """ Audits connections to one or more Qpid message brokers. Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated If no broker-addr is supplied, %prog connects to 'localhost:5672'. [broker-addr] syntax: [username/password@] hostname ip-address [:] Examples: $ %prog localhost:5672 $ %prog 10.1.1.7:10000 $ %prog guest/guest@broker-host:10000 """ def main(argv=None): p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter()) p.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") p.add_option("--whitelist", action="store", type="string", default="./whitelist.xml", metavar="", help="The fully qualified name of the whitelist XML file, default is ./whitelist.xml") options, arguments = p.parse_args(args=argv) if len(arguments) == 0: arguments.append("localhost") console = AuditConsole(options.whitelist) session = Session(console, rcvObjects=True, rcvEvents=True, userBindings=True, manageConnections=True) # Register to receive updates for broker:queue objects. session.bindClass("org.apache.qpid.broker", "queue") # Register to receive updates for broker:exchange objects. session.bindClass("org.apache.qpid.broker", "exchange") # Register to receive updates for broker:binding objects. session.bindClass("org.apache.qpid.broker", "binding") # Register to receive updates for broker:connection objects. session.bindClass("org.apache.qpid.broker", "connection") # Register to receive updates for broker:session objects. session.bindClass("org.apache.qpid.broker", "session") # Register to receive updates for broker:subscription objects. session.bindClass("org.apache.qpid.broker", "subscription") # Register to receive updates for broker:subscribe event. # session.bindEvent("org.apache.qpid.broker", "subscribe") brokers = [] try: try: for host in arguments: brokers.append(session.addBroker(host, None, options.sasl_mechanism)) while (True): sleep(10) except KeyboardInterrupt: print return 0 except Exception, e: print "Failed: %s - %s" % (e.__class__.__name__, e) return 1 finally: while len(brokers): b = brokers.pop() session.delBroker(b) if __name__ == '__main__': sys.exit(main()) --------------020607020802060703000004 Content-Type: text/xml; name="whitelist.xml" Content-Transfer-Encoding: 7bit Content-Disposition: inline; filename="whitelist.xml" qmf.default.topic qmf.default.direct qpid.management amq.direct testqueue --------------020607020802060703000004 Content-Type: text/plain; charset=us-ascii --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:users-subscribe@qpid.apache.org --------------020607020802060703000004--