qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1334037 [24/24] - in /qpid/branches/asyncstore: ./ bin/ cc/ cpp/ cpp/bindings/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qpid/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/...
Date Fri, 04 May 2012 15:40:13 GMT
Modified: qpid/branches/asyncstore/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-config?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-config (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-config Fri May  4 15:39:19 2012
@@ -18,12 +18,18 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import pdb
 
 import os
 from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
 import sys
 import locale
-from qmf.console import Session
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpid.messaging import Connection
+from qpidtoollibs import BrokerAgent
 
 usage = """
 Usage:  qpid-config [OPTIONS]
@@ -36,15 +42,16 @@ Usage:  qpid-config [OPTIONS]
         qpid-config [OPTIONS] bind   <exchange-name> <queue-name> [binding-key]
                   <for type xml>     [-f -|filename]
                   <for type header>  [all|any] k1=v1 [, k2=v2...]
-        qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"""
+        qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]
+        qpid-config [OPTIONS] reload-acl"""
 
 description = """
 Examples:
 
 $ qpid-config add queue q
 $ qpid-config add exchange direct d -a localhost:5672
-$ qpid-config exchanges -a 10.1.1.7:10000
-$ qpid-config queues -a guest/guest@broker-host:10000
+$ qpid-config exchanges -b 10.1.1.7:10000
+$ qpid-config queues -b guest/guest@broker-host:10000
 
 Add Exchange <type> values:
 
@@ -55,7 +62,7 @@ Add Exchange <type> values:
     xml        XML Exchange - allows content filtering using an XQuery
 
 
-Queue Limit Actions
+Queue Limit Actions:
 
     none (default) - Use broker's default policy
     reject         - Reject enqueued messages
@@ -63,12 +70,14 @@ Queue Limit Actions
     ring           - Replace oldest unacquired message with new
     ring-strict    - Replace oldest message, reject if oldest is acquired
 
-Queue Ordering Policies
+Replication levels:
 
-    fifo (default) - First in, first out
-    lvq            - Last Value Queue ordering, allows queue browsing
-    lvq-no-browse  - Last Value Queue ordering, browsing clients may lose data"""
+    none           - no replication
+    configuration  - replicate queue and exchange existence and bindings, but not messages.
+    all            - replicate configuration and messages
+"""
 
+REPLICATE_LEVELS= ["none", "configuration", "all"]
 
 class Config:
     def __init__(self):
@@ -77,8 +86,9 @@ class Config:
         self._connTimeout       = 10
         self._ignoreDefault     = False
         self._altern_ex         = None
-        self._passive           = False
         self._durable           = False
+        self._replicate       = None
+        self._ha_admin          = False
         self._clusterDurable    = False
         self._if_empty          = True
         self._if_unused         = True
@@ -87,8 +97,8 @@ class Config:
         self._maxQueueSize      = None
         self._maxQueueCount     = None
         self._limitPolicy       = None
-        self._order             = None
         self._msgSequence       = False
+        self._lvq_key           = None
         self._ive               = False
         self._eventGeneration   = None
         self._file              = None
@@ -100,6 +110,7 @@ class Config:
         self._msgGroupHeader    = None
         self._sharedMsgGroup    = False
         self._extra_arguments   = []
+        self._start_replica    = None
         self._returnCode        = 0
 
 config = Config()
@@ -110,8 +121,7 @@ MAX_QUEUE_SIZE  = "qpid.max_size"
 MAX_QUEUE_COUNT  = "qpid.max_count"
 POLICY_TYPE  = "qpid.policy_type"
 CLUSTER_DURABLE = "qpid.persist_last_node"
-LVQ = "qpid.last_value_queue"
-LVQNB = "qpid.last_value_queue_no_browse"
+LVQ_KEY = "qpid.last_value_queue_key"
 MSG_SEQUENCE = "qpid.msg_sequence"
 IVE = "qpid.ive"
 QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
@@ -121,14 +131,18 @@ FLOW_STOP_SIZE    = "qpid.flow_stop_size
 FLOW_RESUME_SIZE  = "qpid.flow_resume_size"
 MSG_GROUP_HDR_KEY = "qpid.group_header_key"
 SHARED_MSG_GROUP  = "qpid.shared_msg_group"
+REPLICATE = "qpid.replicate"
 #There are various arguments to declare that have specific program
 #options in this utility. However there is now a generic mechanism for
 #passing arguments as well. The SPECIAL_ARGS list contains the
 #arguments for which there are specific program options defined
 #i.e. the arguments for which there is special processing on add and
 #list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
-              MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
+SPECIAL_ARGS=[
+    FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,
+    LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
+    FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+    MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
 
 class JHelpFormatter(IndentedHelpFormatter):
     """Format usage and description without stripping newlines from usage strings
@@ -146,7 +160,7 @@ class JHelpFormatter(IndentedHelpFormatt
 
 def Usage():
     print usage
-    exit(-1)
+    sys.exit(-1)
 
 def OptionsAndArguments(argv):
     """ Set global variables for options, return arguments """
@@ -160,8 +174,8 @@ def OptionsAndArguments(argv):
 
     group1 = OptionGroup(parser, "General Options")
     group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
-    group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
-    group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
+    group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list")
+    group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
     group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
     parser.add_option_group(group1)
 
@@ -171,8 +185,9 @@ def OptionsAndArguments(argv):
 
     group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
     group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
-    group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.")
     group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
+    group2.add_option("--replicate", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'all').")
+    group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     parser.add_option_group(group2)
 
     group3 = OptionGroup(parser, "Options for Adding Queues")
@@ -182,7 +197,7 @@ def OptionsAndArguments(argv):
     group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
     group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
     group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
-    group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
+    group3.add_option("--lvq-key", action="store", metavar="<key>", help="Last Value Queue key")
     group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
     group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
                       help="Turn on sender flow control when the number of queued bytes exceeds this value.")
@@ -198,6 +213,7 @@ def OptionsAndArguments(argv):
                       help="Allow message group consumption across multiple consumers.")
     group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
                       metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
+    group3.add_option("--start-replica", metavar="<broker-url>", help="Start replication from the same-named queue at <broker-url>")
     # no option for declaring an exclusive queue - which can only be used by the session that creates it.
     parser.add_option_group(group3)
 
@@ -224,10 +240,10 @@ def OptionsAndArguments(argv):
     except:
         args = encArgs
 
-    if opts.bindings:
+    if opts.recursive:
         config._recursive = True
-    if opts.broker_addr:
-        config._host = opts.broker_addr
+    if opts.broker:
+        config._host = opts.broker
     if opts.timeout is not None:
         config._connTimeout = opts.timeout
         if config._connTimeout == 0:
@@ -236,10 +252,13 @@ def OptionsAndArguments(argv):
         config._ignoreDefault = True
     if opts.alternate_exchange:
         config._altern_ex = opts.alternate_exchange
-    if opts.passive:
-        config._passive = True
     if opts.durable:
         config._durable = True
+    if opts.replicate:
+        if not opts.replicate in REPLICATE_LEVELS:
+            raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS)))
+        config._replicate = opts.replicate
+    if opts.ha_admin: config._ha_admin = True
     if opts.cluster_durable:
         config._clusterDurable = True
     if opts.file:
@@ -254,10 +273,10 @@ def OptionsAndArguments(argv):
         config._maxQueueCount = opts.max_queue_count
     if opts.limit_policy:
            config._limitPolicy = opts.limit_policy
-    if opts.order:
-        config._order = opts.order
     if opts.sequence:
         config._msgSequence = True
+    if opts.lvq_key:
+        config._lvq_key = opts.lvq_key
     if opts.ive:
         config._ive = True
     if opts.generate_queue_events:
@@ -285,6 +304,8 @@ def OptionsAndArguments(argv):
         config._sharedMsgGroup = True
     if opts.extra_arguments:
         config._extra_arguments = opts.extra_arguments
+    if opts.start_replica:
+        config._start_replica = opts.start_replica
     return args
 
 
@@ -331,27 +352,24 @@ def snarf_header_args(args):
 class BrokerManager:
     def __init__(self):
         self.brokerName = None
-        self.qmf        = None
+        self.conn       = None
         self.broker     = None
-        self.mechanism  = None
 
     def SetBroker(self, brokerUrl, mechanism):
         self.url = brokerUrl
-        self.qmf = Session()
-        self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism)
-        agents = self.qmf.getAgents()
-        for a in agents:
-            if a.getAgentBank() == '0':
-                self.brokerAgent = a
+        client_properties={}
+        if config._ha_admin: client_properties["qpid.ha-admin"] = 1
+        self.conn = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties)
+        self.broker = BrokerAgent(self.conn)
 
     def Disconnect(self):
-        if self.broker:
-            self.qmf.delBroker(self.broker)
+        if self.conn:
+            self.conn.close()
 
     def Overview(self):
-        exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
-        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
-        print "Total Exchanges: %d" % len (exchanges)
+        exchanges = self.broker.getAllExchanges()
+        queues    = self.broker.getAllQueues()
+        print "Total Exchanges: %d" % len(exchanges)
         etype = {}
         for ex in exchanges:
             if ex.type not in etype:
@@ -362,16 +380,16 @@ class BrokerManager:
             print "%15s: %d" % (typ, etype[typ])
 
         print
-        print "   Total Queues: %d" % len (queues)
+        print "   Total Queues: %d" % len(queues)
         durable = 0
         for queue in queues:
             if queue.durable:
                 durable = durable + 1
         print "        durable: %d" % durable
-        print "    non-durable: %d" % (len (queues) - durable)
+        print "    non-durable: %d" % (len(queues) - durable)
 
     def ExchangeList(self, filter):
-        exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+        exchanges = self.broker.getAllExchanges()
         caption1 = "Type      "
         caption2 = "Exchange Name"
         maxNameLen = len(caption2)
@@ -398,22 +416,23 @@ class BrokerManager:
                 args = ex.arguments
                 if not args: args = {}
                 if ex.durable:    print "--durable",
+                if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
                 if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
                 if IVE in args and args[IVE] == 1: print "--ive",
                 if ex.altExchange:
-                    print "--alternate-exchange=%s" % ex._altExchange_.name,
+                    print "--alternate-exchange=%s" % ex.altExchange,
                 print
 
     def ExchangeListRecurse(self, filter):
-        exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
-        bindings  = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
-        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+        exchanges = self.broker.getAllExchanges()
+        bindings  = self.broker.getAllBindings()
+        queues    = self.broker.getAllQueues()
         for ex in exchanges:
             if config._ignoreDefault and not ex.name: continue
             if self.match(ex.name, filter):
                 print "Exchange '%s' (%s)" % (ex.name, ex.type)
                 for bind in bindings:
-                    if bind.exchangeRef == ex.getObjectId():
+                    if bind.exchangeRef == ex.name:
                         qname = "<unknown>"
                         queue = self.findById(queues, bind.queueRef)
                         if queue != None:
@@ -425,7 +444,7 @@ class BrokerManager:
 
 
     def QueueList(self, filter):
-        queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+        queues = self.broker.getAllQueues()
         caption = "Queue Name"
         maxNameLen = len(caption)
         found = False
@@ -450,6 +469,7 @@ class BrokerManager:
                 args = q.arguments
                 if not args: args = {}
                 if q.durable:    print "--durable",
+                if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
                 if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
                 if q.autoDelete: print "auto-del",
                 if q.exclusive:  print "excl",
@@ -458,11 +478,10 @@ class BrokerManager:
                 if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
                 if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
                 if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
-                if LVQ in args and args[LVQ] == 1: print "--order lvq",
-                if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
+                if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY],
                 if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
                 if q.altExchange:
-                    print "--alternate-exchange=%s" % q._altExchange_.name,
+                    print "--alternate-exchange=%s" % q.altExchange,
                 if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
                 if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
                 if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
@@ -472,14 +491,14 @@ class BrokerManager:
                 print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
 
     def QueueListRecurse(self, filter):
-        exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
-        bindings  = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
-        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+        exchanges = self.broker.getAllExchanges()
+        bindings  = self.broker.getAllBindings()
+        queues    = self.broker.getAllQueues()
         for queue in queues:
             if self.match(queue.name, filter):
                 print "Queue '%s'" % queue.name
                 for bind in bindings:
-                    if bind.queueRef == queue.getObjectId():
+                    if bind.queueRef == queue.name:
                         ename = "<unknown>"
                         ex    = self.findById(exchanges, bind.exchangeRef)
                         if ex != None:
@@ -508,16 +527,21 @@ class BrokerManager:
             declArgs[MSG_SEQUENCE] = 1
         if config._ive:
             declArgs[IVE] = 1
-        if  config._altern_ex != None:
-            self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
-        else:
-            self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs)
+        if config._altern_ex:
+            declArgs['alternate-exchange'] = config._altern_ex
+        if config._durable:
+            declArgs['durable'] = 1
+        if config._replicate:
+            declArgs[REPLICATE] = config._replicate
+        self.broker.addExchange(etype, ename, declArgs)
+
 
     def DelExchange(self, args):
         if len(args) < 1:
             Usage()
         ename = args[0]
-        self.broker.getAmqpSession().exchange_delete(exchange=ename)
+        self.broker.delExchange(ename)
+
 
     def AddQueue(self, args):
         if len(args) < 1:
@@ -550,15 +574,10 @@ class BrokerManager:
             elif config._limitPolicy == "ring-strict":
                 declArgs[POLICY_TYPE] = "ring_strict"
 
-        if  config._clusterDurable:
+        if config._clusterDurable:
             declArgs[CLUSTER_DURABLE] = 1
-        if config._order:
-            if config._order == "fifo":
-                pass
-            elif config._order == "lvq":
-                declArgs[LVQ] = 1
-            elif config._order == "lvq-no-browse":
-                declArgs[LVQNB] = 1
+        if config._lvq_key:
+            declArgs[LVQ_KEY] = config._lvq_key
         if config._eventGeneration:
             declArgs[QUEUE_EVENT_GENERATION]  = config._eventGeneration
 
@@ -576,17 +595,21 @@ class BrokerManager:
         if config._sharedMsgGroup:
             declArgs[SHARED_MSG_GROUP] = 1
 
-        if config._altern_ex != None:
-            self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
-        else:
-            self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
-
+        if config._altern_ex:
+            declArgs['alternate-exchange'] = config._altern_ex
+        if config._durable:
+            declArgs['durable'] = 1
+        if config._replicate:
+            declArgs[REPLICATE] = config._replicate
+        self.broker.addQueue(qname, declArgs)
+        if config._start_replica:      # Start replication
+            self.broker._method("replicate", {"broker":config._start_replica, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker")
 
     def DelQueue(self, args):
         if len(args) < 1:
             Usage()
         qname = args[0]
-        self.broker.getAmqpSession().queue_delete(queue=qname, if_empty=config._if_empty, if_unused=config._if_unused)
+        self.broker.delQueue(qname)
 
 
     def Bind(self, args):
@@ -599,7 +622,7 @@ class BrokerManager:
             key = args[2]
 
         # query the exchange to determine its type.
-        res = self.broker.getAmqpSession().exchange_query(ename)
+        res = self.broker.getExchange(ename)
 
         # type of the xchg determines the processing of the rest of
         # argv.  if it's an xml xchg, we want to find a file
@@ -608,7 +631,7 @@ class BrokerManager:
         # map containing key/value pairs.  if neither of those, extra
         # args are ignored.
         ok = True
-        _args = None
+        _args = {}
         if res.type == "xml":
             # this checks/imports the -f arg
             [ok, xquery] = snarf_xquery_args()
@@ -622,10 +645,7 @@ class BrokerManager:
         if not ok:
             sys.exit(1)
 
-        self.broker.getAmqpSession().exchange_bind(queue=qname,
-                                                    exchange=ename,
-                                                    binding_key=key,
-                                                    arguments=_args)
+        self.broker.bind(ename, qname, key, _args)
 
     def Unbind(self, args):
         if len(args) < 2:
@@ -635,11 +655,20 @@ class BrokerManager:
         key   = ""
         if len(args) > 2:
             key = args[2]
-        self.broker.getAmqpSession().exchange_unbind(queue=qname, exchange=ename, binding_key=key)
+        self.broker.unbind(ename, qname, key)
+
+    def ReloadAcl(self):
+        try:
+            self.broker.reloadAclFile()
+        except Exception, e:
+            if str(e).find('No object found') != -1:
+                print "Failed: ACL Module Not Loaded in Broker"
+            else:
+                raise
 
     def findById(self, items, id):
         for item in items:
-            if item.getObjectId() == id:
+            if item.name == id:
                 return item
         return None
 
@@ -697,6 +726,8 @@ def main(argv=None):
                 bm.Bind(args[1:])
             elif cmd == "unbind":
                 bm.Unbind(args[1:])
+            elif cmd == "reload-acl":
+                bm.ReloadAcl()
             else:
                 Usage()
     except KeyboardInterrupt:
@@ -725,9 +756,9 @@ def main(argv=None):
             if e.__class__.__name__ != "Timeout":
                 print "Failed: %s: %s" % (e.__class__.__name__, e)
                 return 1
-
     return config._returnCode
 
+
 if __name__ == "__main__":
         sys.exit(main())
 

Modified: qpid/branches/asyncstore/tools/src/py/qpid-route
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-route?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-route (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-route Fri May  4 15:39:19 2012
@@ -62,6 +62,7 @@ class Config:
         self._ack       = 0
         self._connTimeout = 10
         self._client_sasl_mechanism = None
+        self._ha_admin  = False
 
 config = Config()
 
@@ -96,7 +97,7 @@ def OptionsAndArguments(argv):
     parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
 
     parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
-
+    parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     opts, encArgs = parser.parse_args(args=argv)
 
     try:
@@ -128,6 +129,9 @@ def OptionsAndArguments(argv):
     if opts.transport:
         config._transport = opts.transport
 
+    if opts.ha_admin:
+        config._ha_admin = True
+
     if opts.ack:
         config._ack = opts.ack
 
@@ -143,7 +147,9 @@ class RouteManager:
         self.local = BrokerURL(localBroker)
         self.remote  = None
         self.qmf = Session()
-        self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism)
+        client_properties = {}
+        if config._ha_admin: client_properties["qpid.ha-admin"] = 1
+        self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism, client_properties=client_properties)
         self.broker._waitForStable()
         self.agent = self.broker.getBrokerAgent()
 

Modified: qpid/branches/asyncstore/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-stat?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-stat (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-stat Fri May  4 15:39:19 2012
@@ -30,8 +30,8 @@ from qpid.messaging import Connection
 home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
 sys.path.append(os.path.join(home, "python"))
 
-from qpidtoollibs.broker import BrokerAgent
-from qpidtoollibs.disp import Display, Header, Sorter
+from qpidtoollibs import BrokerAgent
+from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong
 
 
 class Config:
@@ -42,8 +42,8 @@ class Config:
         self._limit = 50
         self._increasing = False
         self._sortcol = None
-        self._details = None
         self._sasl_mechanism = None
+        self._ha_admin = False
 
 config = Config()
 
@@ -52,42 +52,45 @@ def OptionsAndArguments(argv):
 
     global config
 
-    parser = OptionParser(usage="usage: %prog [options] BROKER",
-                      description="Example: $ qpid-stat -q  broker-host:10000")
+    parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]")
 
     group1 = OptionGroup(parser, "General Options")
-    group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
-    group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+    group1.add_option("-b", "--broker",  action="store", type="string", default="localhost", metavar="<url>",
+                      help="URL of the broker to query")
+    group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
+                      help="Maximum time to wait for broker connection (in seconds)")
+    group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
+                      help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+    group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     parser.add_option_group(group1)
 
     group2 = OptionGroup(parser, "Display Options")
-    group2.add_option("-b", "--broker", help="Show Brokers",               action="store_const", const="b", dest="show")
-    group2.add_option("-c", "--connections", help="Show Connections",      action="store_const", const="c", dest="show")
-    group2.add_option("-e", "--exchanges", help="Show Exchanges",          action="store_const", const="e", dest="show")
-    group2.add_option("-q", "--queues", help="Show Queues",                action="store_const", const="q", dest="show")
-    group2.add_option("-u", "--subscriptions", help="Show Subscriptions",  action="store_const", const="u", dest="show")
-    group2.add_option("-m", "--memory", help="Show Broker Memory Stats",   action="store_const", const="m", dest="show")
+    group2.add_option("-g", "--general", help="Show General Broker Stats",  action="store_const", const="g",   dest="show")
+    group2.add_option("-c", "--connections", help="Show Connections",       action="store_const", const="c",   dest="show")
+    group2.add_option("-e", "--exchanges", help="Show Exchanges",           action="store_const", const="e",   dest="show")
+    group2.add_option("-q", "--queues", help="Show Queues",                 action="store_const", const="q",   dest="show")
+    group2.add_option("-u", "--subscriptions", help="Show Subscriptions",   action="store_const", const="u",   dest="show")
+    group2.add_option("-m", "--memory", help="Show Broker Memory Stats",    action="store_const", const="m",   dest="show")
+    group2.add_option(      "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
     group2.add_option("-S", "--sort-by",  metavar="<colname>",                   help="Sort by column name")
     group2.add_option("-I", "--increasing", action="store_true", default=False,  help="Sort by increasing value (default = decreasing)")
     group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",    help="Limit output to n rows")
-    group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.")
+
     parser.add_option_group(group2)
 
     opts, args = parser.parse_args(args=argv)
 
     if not opts.show:
-        parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help")
+        parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help")
 
     config._types = opts.show
     config._sortcol = opts.sort_by
+    config._host = opts.broker
     config._connTimeout = opts.timeout
     config._increasing = opts.increasing
     config._limit = opts.limit
     config._sasl_mechanism = opts.sasl_mechanism
-    config._detail = opts.detail
-
-    if args:
-        config._host = args[0]
+    config._ha_admin = opts.ha_admin
 
     return args
 
@@ -118,24 +121,24 @@ class IpAddr:
 
 class BrokerManager:
     def __init__(self):
-        self.brokerName  = None
-        self.connections = []
-        self.brokers     = []
-        self.cluster     = None
+        self.brokerName = None
+        self.connection = None
+        self.broker     = None
+        self.cluster    = None
 
     def SetBroker(self, brokerUrl, mechanism):
         self.url = brokerUrl
-        self.connections.append(Connection(self.url, sasl_mechanism=mechanism))
-        self.connections[0].open()
-        self.brokers.append(BrokerAgent(self.connections[0]))
+        client_properties={}
+        if config._ha_admin: client_properties["qpid.ha-admin"] = 1
+        self.connection = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties)
+        self.broker = BrokerAgent(self.connection)
 
     def Disconnect(self):
         """ Release any allocated brokers.  Ignore any failures as the tool is
         shutting down.
         """
         try:
-            for conn in self.connections:
-                conn.close()
+            connection.close()
         except:
             pass
 
@@ -175,7 +178,7 @@ class BrokerManager:
             hosts.append(bestUrl)
         return hosts
 
-    def displayBroker(self, subs):
+    def displayBroker(self):
         disp = Display(prefix="  ")
         heads = []
         heads.append(Header('uptime', Header.DURATION))
@@ -184,7 +187,7 @@ class BrokerManager:
         heads.append(Header('exchanges', Header.COMMAS))
         heads.append(Header('queues', Header.COMMAS))
         rows = []
-        broker = self.brokers[0].getBroker()
+        broker = self.broker.getBroker()
         connections = self.getConnectionMap()
         sessions = self.getSessionMap()
         exchanges = self.getExchangeMap()
@@ -229,7 +232,7 @@ class BrokerManager:
         disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
 
 
-    def displayConn(self, subs):
+    def displayConn(self):
         disp = Display(prefix="  ")
         heads = []
         heads.append(Header('client-addr'))
@@ -241,8 +244,8 @@ class BrokerManager:
         heads.append(Header('msgIn', Header.KMG))
         heads.append(Header('msgOut', Header.KMG))
         rows = []
-        connections = self.brokers[0].getAllConnections()
-        broker = self.brokers[0].getBroker()
+        connections = self.broker.getAllConnections()
+        broker = self.broker.getBroker()
         for conn in connections:
             row = []
             row.append(conn.address)
@@ -262,10 +265,10 @@ class BrokerManager:
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
-    def displaySession(self, subs):
+    def displaySession(self):
         disp = Display(prefix="  ")
 
-    def displayExchange(self, subs):
+    def displayExchange(self):
         disp = Display(prefix="  ")
         heads = []
         heads.append(Header("exchange"))
@@ -279,7 +282,7 @@ class BrokerManager:
         heads.append(Header("byteOut", Header.KMG))
         heads.append(Header("byteDrop", Header.KMG))
         rows = []
-        exchanges = self.brokers[0].getAllExchanges()
+        exchanges = self.broker.getAllExchanges()
         for ex in exchanges:
             row = []
             row.append(ex.name)
@@ -301,7 +304,7 @@ class BrokerManager:
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
-    def displayQueues(self, subs):
+    def displayQueues(self):
         disp = Display(prefix="  ")
         heads = []
         heads.append(Header("queue"))
@@ -317,7 +320,7 @@ class BrokerManager:
         heads.append(Header("cons", Header.KMG))
         heads.append(Header("bind", Header.KMG))
         rows = []
-        queues = self.brokers[0].getAllQueues()
+        queues = self.broker.getAllQueues()
         for q in queues:
             row = []
             row.append(q.name)
@@ -341,11 +344,67 @@ class BrokerManager:
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
-    def displayQueue(self, subs):
+
+    def displayQueue(self, name):
+        queue = self.broker.getQueue(name)
+        if not queue:
+            print "Queue '%s' not found" % name
+            return
+
         disp = Display(prefix="  ")
         heads = []
+        heads.append(Header('Name'))
+        heads.append(Header('Durable', Header.YN))
+        heads.append(Header('AutoDelete', Header.YN))
+        heads.append(Header('Exclusive', Header.YN))
+        heads.append(Header('FlowStopped', Header.YN))
+        heads.append(Header('FlowStoppedCount', Header.COMMAS))
+        heads.append(Header('Consumers', Header.COMMAS))
+        heads.append(Header('Bindings', Header.COMMAS))
+        rows = []
+        rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive,
+                     queue.flowStopped, queue.flowStoppedCount,
+                     queue.consumerCount, queue.bindingCount])
+        disp.formattedTable("Properties:", heads, rows)
+        print
+
+        heads = []
+        heads.append(Header('Property'))
+        heads.append(Header('Value'))
+        rows = []
+        rows.append(['arguments', queue.arguments])
+        rows.append(['alt-exchange', queue.altExchange])
+        disp.formattedTable("Optional Properties:", heads, rows)
+        print
+
+        heads = []
+        heads.append(Header('Statistic'))
+        heads.append(Header('Messages', Header.COMMAS))
+        heads.append(Header('Bytes', Header.COMMAS))
+        rows = []
+        rows.append(['queue-depth',                queue.msgDepth,           queue.byteDepth])
+        rows.append(['total-enqueues',             queue.msgTotalEnqueues,   queue.byteTotalEnqueues])
+        rows.append(['total-dequeues',             queue.msgTotalDequeues,   queue.byteTotalDequeues])
+        rows.append(['persistent-enqueues',        queue.msgPersistEnqueues, queue.bytePersistEnqueues])
+        rows.append(['persistent-dequeues',        queue.msgPersistDequeues, queue.bytePersistDequeues])
+        rows.append(['transactional-enqueues',     queue.msgTxnEnqueues,     queue.byteTxnEnqueues])
+        rows.append(['transactional-dequeues',     queue.msgTxnDequeues,     queue.byteTxnDequeues])
+        rows.append(['flow-to-disk-depth',         queue.msgFtdDepth,        queue.byteFtdDepth])
+        rows.append(['flow-to-disk-enqueues',      queue.msgFtdEnqueues,     queue.byteFtdEnqueues])
+        rows.append(['flow-to-disk-dequeues',      queue.msgFtdDequeues,     queue.byteFtdDequeues])
+        rows.append(['acquires',                   queue.acquires,           None])
+        rows.append(['releases',                   queue.releases,           None])
+        rows.append(['discards-ttl-expired',       queue.discardsTtl,        None])
+        rows.append(['discards-limit-overflow',    queue.discardsOverflow,   None])
+        rows.append(['discards-ring-overflow',     queue.discardsRing,       None])
+        rows.append(['discards-lvq-replace',       queue.discardsLvq,        None])
+        rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None])
+        rows.append(['discards-purged',            queue.discardsPurge,      None])
+        rows.append(['reroutes',                   queue.reroutes,           None])
+        disp.formattedTable("Statistics:", heads, rows)
 
-    def displaySubscriptions(self, subs):
+
+    def displaySubscriptions(self):
         disp = Display(prefix="  ")
         heads = []
         heads.append(Header("subscr"))
@@ -359,7 +418,7 @@ class BrokerManager:
         heads.append(Header("creditMode"))
         heads.append(Header("delivered", Header.KMG))
         rows = []
-        subscriptions = self.brokers[0].getAllSubscriptions()
+        subscriptions = self.broker.getAllSubscriptions()
         sessions = self.getSessionMap()
         connections = self.getConnectionMap()
         for s in subscriptions:
@@ -388,59 +447,75 @@ class BrokerManager:
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
-    def displayMemory(self, unused):
+    def displayMemory(self):
         disp = Display(prefix="  ")
         heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
         rows = []
-        memory = self.brokers[0].getMemory()
+        memory = self.broker.getMemory()
         for k,v in memory.values.items():
             if k != 'name':
                 rows.append([k, v])
         disp.formattedTable('Broker Memory Statistics:', heads, rows)
 
+    def displayAcl(self):
+        acl = self.broker.getAcl()
+        if not acl:
+            print "ACL Policy Module is not installed"
+            return
+        disp = Display(prefix="  ")
+        heads = [Header('Statistic'), Header('Value')]
+        rows = []
+        rows.append(['policy-file',       acl.policyFile])
+        rows.append(['enforcing',         YN(acl.enforcingAcl)])
+        rows.append(['has-transfer-acls', YN(acl.transferAcl)])
+        rows.append(['last-acl-load',     TimeLong(acl.lastAclLoad)])
+        rows.append(['acl-denials',       Commas(acl.aclDenyCount)])
+        disp.formattedTable('ACL Policy Statistics:', heads, rows)
+
     def getExchangeMap(self):
-        exchanges = self.brokers[0].getAllExchanges()
+        exchanges = self.broker.getAllExchanges()
         emap = {}
         for e in exchanges:
             emap[e.name] = e
         return emap
 
     def getQueueMap(self):
-        queues = self.brokers[0].getAllQueues()
+        queues = self.broker.getAllQueues()
         qmap = {}
         for q in queues:
             qmap[q.name] = q
         return qmap
 
     def getSessionMap(self):
-        sessions = self.brokers[0].getAllSessions()
+        sessions = self.broker.getAllSessions()
         smap = {}
         for s in sessions:
             smap[s.name] = s
         return smap
 
     def getConnectionMap(self):
-        connections = self.brokers[0].getAllConnections()
+        connections = self.broker.getAllConnections()
         cmap = {}
         for c in connections:
             cmap[c.address] = c
         return cmap
 
-    def displayMain(self, main, subs):
-        if   main == 'b': self.displayBroker(subs)
-        elif main == 'c': self.displayConn(subs)
-        elif main == 's': self.displaySession(subs)
-        elif main == 'e': self.displayExchange(subs)
+    def displayMain(self, names, main):
+        if   main == 'g': self.displayBroker()
+        elif main == 'c': self.displayConn()
+        elif main == 's': self.displaySession()
+        elif main == 'e': self.displayExchange()
         elif main == 'q':
-            if config._detail:
-                self.displayQueue(subs, config._detail)
+            if len(names) >= 1:
+                self.displayQueue(names[0])
             else:
-                self.displayQueues(subs)
-        elif main == 'u': self.displaySubscriptions(subs)
-        elif main == 'm': self.displayMemory(subs)
+                self.displayQueues()
+        elif main == 'u':   self.displaySubscriptions()
+        elif main == 'm':   self.displayMemory()
+        elif main == 'acl': self.displayAcl()
 
-    def display(self):
-        self.displayMain(config._types[0], config._types[1:])
+    def display(self, names):
+        self.displayMain(names, config._types)
 
 
 def main(argv=None):
@@ -450,7 +525,7 @@ def main(argv=None):
 
     try:
         bm.SetBroker(config._host, config._sasl_mechanism)
-        bm.display()
+        bm.display(args)
         bm.Disconnect()
         return 0
     except KeyboardInterrupt:

Modified: qpid/branches/asyncstore/tools/src/py/qpidtoollibs/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpidtoollibs/__init__.py?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpidtoollibs/__init__.py (original)
+++ qpid/branches/asyncstore/tools/src/py/qpidtoollibs/__init__.py Fri May  4 15:39:19 2012
@@ -16,3 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+
+from qpidtoollibs.broker import *
+from qpidtoollibs.disp import *
+

Modified: qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py (original)
+++ qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py Fri May  4 15:39:19 2012
@@ -24,6 +24,9 @@ except ImportError:
   from qpid.datatypes import uuid4
 
 class BrokerAgent(object):
+  """
+  Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection.
+  """
   def __init__(self, conn):
     self.conn = conn
     self.sess = self.conn.session()
@@ -35,6 +38,9 @@ class BrokerAgent(object):
     self.next_correlator = 1
 
   def close(self):
+    """
+    Close the proxy session.  This will not affect the connection used in creating the object.
+    """
     self.sess.close()
 
   def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
@@ -89,9 +95,8 @@ class BrokerAgent(object):
       self.sess.acknowledge()
     return items
 
-  def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
-    query = {'_what'      : 'OBJECT',
-             '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
+  def _doNameQuery(self, object_id):
+    query = {'_what'      : 'OBJECT', '_object_id' : {'_object_name' : object_id}}
     correlator = self._sendRequest('_query_request', query)
     response = self.reply_rx.fetch(10)
     if response.properties['qmf.opcode'] != '_query_response':
@@ -116,65 +121,74 @@ class BrokerAgent(object):
     for item in items:
       objs.append(cls(self, item))
     return objs
-    
-  def _getBrokerObject(self, cls, name):
-    obj = self._doNameQuery(cls.__name__.lower(), name)
+
+  def _getBrokerObject(self, cls, oid):
+    obj = self._doNameQuery(oid)
     if obj:
       return cls(self, obj)
     return None
 
-  def getCluster(self):
-    return self._getAllBrokerObjects(Cluster)
-
-  def getBroker(self):
+  def _getSingleObject(self, cls):
     #
     # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because
     # of a bug that used to be in the broker whereby by-name queries did not return the
     # object timestamps.
     #
-    brokers = self._getAllBrokerObjects(Broker)
-    if brokers:
-      return brokers[0]
+    objects = self._getAllBrokerObjects(cls)
+    if objects: return objects[0]
     return None
 
-  def getMemory(self):
-    return self._getAllBrokerObjects(Memory)[0]
+  def getBroker(self):
+    """
+    Get the Broker object that contains broker-scope statistics and operations.
+    """
+    return self._getSingleObject(Broker)
+
+
+  def getCluster(self):
+    return self._getSingleObject(Cluster)
+
+  def getHaBroker(self):
+    return self._getSingleObject(HaBroker)
 
   def getAllConnections(self):
     return self._getAllBrokerObjects(Connection)
 
-  def getConnection(self, name):
-    return self._getBrokerObject(Connection, name)
+  def getConnection(self, oid):
+    return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid)
 
   def getAllSessions(self):
     return self._getAllBrokerObjects(Session)
 
-  def getSession(self, name):
-    return self._getBrokerObject(Session, name)
+  def getSession(self, oid):
+    return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid)
 
   def getAllSubscriptions(self):
     return self._getAllBrokerObjects(Subscription)
 
-  def getSubscription(self, name):
-    return self._getBrokerObject(Subscription, name)
+  def getSubscription(self, oid):
+    return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid)
 
   def getAllExchanges(self):
     return self._getAllBrokerObjects(Exchange)
 
   def getExchange(self, name):
-    return self._getBrokerObject(Exchange, name)
+    return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name)
 
   def getAllQueues(self):
     return self._getAllBrokerObjects(Queue)
 
   def getQueue(self, name):
-    return self._getBrokerObject(Queue, name)
+    return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name)
 
   def getAllBindings(self):
     return self._getAllBrokerObjects(Binding)
 
-  def getBinding(self, exchange=None, queue=None):
-    pass
+  def getAllLinks(self):
+    return self._getAllBrokerObjects(Link)
+
+  def getAcl(self):
+    return self._getSingleObject(Acl)
 
   def echo(self, sequence, body):
     """Request a response to test the path to the management broker"""
@@ -204,23 +218,69 @@ class BrokerAgent(object):
     """Get the message timestamping configuration"""
     pass
 
-#  def addExchange(self, exchange_type, name, **kwargs):
-#    pass
-
-#  def delExchange(self, name):
-#    pass
-
-#  def addQueue(self, name, **kwargs):
-#    pass
-
-#  def delQueue(self, name):
-#    pass
-
-#  def bind(self, exchange, queue, key, **kwargs):
-#    pass
-
-#  def unbind(self, exchange, queue, key, **kwargs):
-#    pass
+  def addExchange(self, exchange_type, name, options={}, **kwargs):
+    properties = {}
+    properties['exchange-type'] = exchange_type
+    for k,v in options.items():
+      properties[k] = v
+    for k,v in kwargs.items():
+      properties[k] = v
+    args = {'type':       'exchange',
+            'name':        name,
+            'properties':  properties,
+            'strict':      True}
+    self._method('create', args)
+
+  def delExchange(self, name):
+    args = {'type': 'exchange', 'name': name}
+    self._method('delete', args)
+
+  def addQueue(self, name, options={}, **kwargs):
+    properties = options
+    for k,v in kwargs.items():
+      properties[k] = v
+    args = {'type':       'queue',
+            'name':        name,
+            'properties':  properties,
+            'strict':      True}
+    self._method('create', args)
+
+  def delQueue(self, name):
+    args = {'type': 'queue', 'name': name}
+    self._method('delete', args)
+
+  def bind(self, exchange, queue, key, options={}, **kwargs):
+    properties = options
+    for k,v in kwargs.items():
+      properties[k] = v
+    args = {'type':       'binding',
+            'name':       "%s/%s/%s" % (exchange, queue, key),
+            'properties':  properties,
+            'strict':      True}
+    self._method('create', args)
+
+  def unbind(self, exchange, queue, key, **kwargs):
+    args = {'type':       'binding',
+            'name':       "%s/%s/%s" % (exchange, queue, key),
+            'strict':      True}
+    self._method('delete', args)
+
+  def reloadAclFile(self):
+    self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+
+  def acl_lookup(self, userName, action, aclObj, aclObjName, propMap):
+    args = {'userId':      userName,
+            'action':      action,
+            'object':      aclObj,
+            'objectName':  aclObjName,
+            'propertyMap': propMap}
+    return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+
+  def acl_lookupPublish(self, userName, exchange, key):
+    args = {'userId':       userName,
+            'exchangeName': exchange,
+            'routingKey':   key}
+    return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
 
   def create(self, _type, name, properties, strict):
     """Create an object of the specified type"""
@@ -230,9 +290,9 @@ class BrokerAgent(object):
     """Delete an object of the specified type"""
     pass
 
-  def query(self, _type, name):
+  def query(self, _type, oid):
     """Query the current state of an object"""
-    return self._getBrokerObject(self, _type, name)
+    return self._getBrokerObject(self, _type, oid)
 
 
 class BrokerObject(object):
@@ -255,6 +315,9 @@ class BrokerObject(object):
           return full_name[colon+1:]
     return value
 
+  def getObjectId(self):
+    return self.content['_object_id']['_object_name']
+
   def getAttributes(self):
     return self.values
 
@@ -271,7 +334,7 @@ class BrokerObject(object):
     """
     Reload the property values from the agent.
     """
-    refreshed = self.broker._getBrokerObject(self.__class__, self.name)
+    refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId())
     if refreshed:
       self.content = refreshed.content
       self.values = self.content['_values']
@@ -282,6 +345,14 @@ class Broker(BrokerObject):
   def __init__(self, broker, values):
     BrokerObject.__init__(self, broker, values)
 
+class Cluster(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+class HaBroker(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
 class Memory(BrokerObject):
   def __init__(self, broker, values):
     BrokerObject.__init__(self, broker, values)
@@ -328,3 +399,10 @@ class Queue(BrokerObject):
     self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
                         "org.apache.qpid.broker:queue:%s" % self.name)
 
+class Link(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+class Acl(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)

Modified: qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py (original)
+++ qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py Fri May  4 15:39:19 2012
@@ -21,6 +21,31 @@
 
 from time import strftime, gmtime
 
+def YN(val):
+  if val:
+    return 'Y'
+  return 'N'
+
+def Commas(value):
+  sval = str(value)
+  result = ""
+  while True:
+    if len(sval) == 0:
+      return result
+    left = sval[:-3]
+    right = sval[-3:]
+    result = right + result
+    if len(left) > 0:
+      result = ',' + result
+    sval = left
+
+def TimeLong(value):
+  return strftime("%c", gmtime(value / 1000000000))
+
+def TimeShort(value):
+  return strftime("%X", gmtime(value / 1000000000))
+
+
 class Header:
   """ """
   NONE = 1
@@ -59,9 +84,9 @@ class Header:
           return 'Y'
         return ''
       if self.format == Header.TIME_LONG:
-         return strftime("%c", gmtime(value / 1000000000))
+         return TimeLong(value)
       if self.format == Header.TIME_SHORT:
-         return strftime("%X", gmtime(value / 1000000000))
+         return TimeShort(value)
       if self.format == Header.DURATION:
         if value < 0: value = 0
         sec = value / 1000000000
@@ -78,17 +103,7 @@ class Header:
         result += "%ds" % (sec % 60)
         return result
       if self.format == Header.COMMAS:
-        sval = str(value)
-        result = ""
-        while True:
-          if len(sval) == 0:
-            return result
-          left = sval[:-3]
-          right = sval[-3:]
-          result = right + result
-          if len(left) > 0:
-            result = ',' + result
-          sval = left
+        return Commas(value)
     except:
       return "?"
 

Modified: qpid/branches/asyncstore/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp (original)
+++ qpid/branches/asyncstore/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp Fri May  4 15:39:19 2012
@@ -49,6 +49,7 @@
 
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/Connection.h"
+#include "qpid/framing/FieldValue.h"
 
 
 #include <map>
@@ -472,13 +473,15 @@ INT ResourceManager::recover(XID *xids, 
 	try {
 	    // status if we can't talk to the broker
 	    status = XAER_RMFAIL;
-	    std::vector<std::string> wireFormatXids;
 
 	    DtxRecoverResult dtxrr = qpidSession.dtxRecover(true);
 
 	    // status if we can't process the xids
 	    status = XAER_RMERR;
-	    dtxrr.getInDoubt().collect(wireFormatXids);
+
+        std::vector<std::string> wireFormatXids(dtxrr.getInDoubt().size());
+        std::transform(dtxrr.getInDoubt().begin(), dtxrr.getInDoubt().end(), wireFormatXids.begin(), Array::get<std::string, Array::ValuePtr>);
+
 	    size_t nXids = wireFormatXids.size();
 
 	    if (nXids > 0) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message