qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1354874 [14/14] - in /qpid/branches/java-config-and-management: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qpid/python/ qpid/cpp/bindings/qpid/ruby/ qpid/cpp/bindings/qpid/ruby/features/ q...
Date Thu, 28 Jun 2012 09:15:39 GMT
Propchange: qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:r1339579-1339789

Propchange: qpid/branches/java-config-and-management/qpid/packaging/windows/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/packaging/windows:r1300143-1349442,1349444-1349530,1349532-1353860

Propchange: qpid/branches/java-config-and-management/qpid/python/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/python:r1300143-1349442,1349444-1349530,1349532-1353860

Propchange: qpid/branches/java-config-and-management/qpid/python/examples/api/spout
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/python/examples/api/spout:r1300143-1349442,1349444-1349530,1349532-1353860

Propchange: qpid/branches/java-config-and-management/qpid/python/qpid/concurrency.py
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/python/qpid/concurrency.py:r1300143-1349442,1349444-1349530,1349532-1353860

Modified: qpid/branches/java-config-and-management/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/python/qpid/connection.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/python/qpid/connection.py (original)
+++ qpid/branches/java-config-and-management/qpid/python/qpid/connection.py Thu Jun 28 09:14:52
2012
@@ -170,6 +170,10 @@ class Connection(Framer):
           if not status:
             self.detach_all()
             break
+      # When we do not use SSL transport, we get periodic 
+      # spurious timeout events on the socket.  When using SSL,
+      # these events show up as timeout *errors*.  Both should be
+      # ignored unless we have aborted.
       except socket.timeout:
         if self.aborted():
           self.close_code = (None, "connection timed out")
@@ -178,9 +182,12 @@ class Connection(Framer):
         else:
           continue
       except socket.error, e:
-        self.close_code = (None, str(e))
-        self.detach_all()
-        break
+        if self.aborted() or str(e) != "The read operation timed out":
+          self.close_code = (None, str(e))
+          self.detach_all()
+          break
+        else:
+          continue
       frame_dec.write(data)
       seg_dec.write(*frame_dec.read())
       op_dec.write(*seg_dec.read())

Modified: qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py Thu Jun
28 09:14:52 2012
@@ -226,7 +226,11 @@ class LinkIn:
 
   def do_link(self, sst, rcv, _rcv, type, subtype, action):
     link_opts = _rcv.options.get("link", {})
-    reliability = link_opts.get("reliability", "at-least-once")
+    if type == "topic":
+      default_reliability = "unreliable"
+    else:
+      default_reliability = "at-least-once"
+    reliability = link_opts.get("reliability", default_reliability)
     declare = link_opts.get("x-declare", {})
     subscribe = link_opts.get("x-subscribe", {})
     acq_mode = acquire_mode.pre_acquired

Modified: qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py (original)
+++ qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py Thu Jun 28
09:14:52 2012
@@ -50,10 +50,13 @@ def set_reconnect_urls(conn, msg):
   reconnect_urls = []
   urls = msg.properties["amq.failover"]
   for u in urls:
+    # FIXME aconway 2012-06-12: Nasty hack parsing of the C++ broker's URL format.
     if u.startswith("amqp:"):
-      for p in u[5:].split(","):
-        parts = p.split(":")
-        host, port = parts[1:3]
+      for a in u[5:].split(","):
+        parts = a.split(":")
+        # Handle IPv6 addresses which have : in the host part.
+        port = parts[-1]        # Last : separated field is port
+        host = ":".join(parts[1:-1]) # First : separated field is protocol, host is the rest.
         reconnect_urls.append("%s:%s" % (host, port))
   conn.reconnect_urls = reconnect_urls
   log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)

Modified: qpid/branches/java-config-and-management/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/specs/management-schema.xml?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/specs/management-schema.xml (original)
+++ qpid/branches/java-config-and-management/qpid/specs/management-schema.xml Thu Jun 28 09:14:52
2012
@@ -8,9 +8,9 @@
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
-  
+
     http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing,
   software distributed under the License is distributed on an
   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -81,7 +81,6 @@
     <property name="systemRef"        type="objId"  references="System" access="RO" desc="System
ID" parentRef="y"/>
     <property name="port"             type="uint16" access="RO" desc="TCP Port for AMQP
Service"/>
     <property name="workerThreads"    type="uint16" access="RO" desc="Thread pool size"/>
-    <property name="maxConns"         type="uint16" access="RO" desc="Maximum allowed
connections"/>
     <property name="connBacklog"      type="uint16" access="RO" desc="Connection backlog
limit for listening socket"/>
     <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages
over this size to disk"/>
     <property name="mgmtPublish"      type="bool"   access="RO" desc="Broker's management
agent sends unsolicited data on the publish interval"/>
@@ -125,8 +124,8 @@
     <statistic name="abandonedViaAlt"     type="count64" unit="message" desc="Messages
routed to alternate exchange from a deleted queue"/>
 
     <method name="echo" desc="Request a response to test the path to the management broker">
-      <arg name="sequence" dir="IO" type="uint32" default="0"/>
-      <arg name="body"     dir="IO" type="lstr"   default=""/>
+      <arg name="sequence" dir="IO" type="uint32"/>
+      <arg name="body"     dir="IO" type="lstr"/>
     </method>
 
     <method name="connect" desc="Establish a connection to another broker">
@@ -143,7 +142,7 @@
       <arg name="srcQueue"          dir="I" type="sstr" desc="Source queue"/>
       <arg name="destQueue"         dir="I" type="sstr" desc="Destination queue"/>
       <arg name="qty"               dir="I" type="uint32" desc="# of messages to move.
0 means all messages"/>
-      <arg name="filter"  dir="I" type="map" default="{}"   desc="if specified, move only
those messages matching this filter"/>
+      <arg name="filter"  dir="I" type="map" desc="if specified, move only those messages
matching this filter"/>
     </method>
 
     <method name="setLogLevel" desc="Set the log level">
@@ -164,20 +163,20 @@
 
     <method name="create" desc="Create an object of the specified type">
       <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>

-      <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>

-      <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object
properties as an error"/> 
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
+      <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
+      <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object
properties as an error"/>
     </method>
 
     <method name="delete" desc="Delete an object of the specified type">
       <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>

-      <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>

+      <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
+      <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
     </method>
 
     <method name="query" desc="Query the current state of an object.">
       <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>

+      <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>
       <arg name="results" dir="O" type="map"  desc="A snapshot of the object's state."/>
     </method>
 
@@ -272,14 +271,14 @@
 
     <method name="purge" desc="Discard all or some messages on a queue">
       <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for
n messages"/>
-      <arg name="filter"  dir="I" type="map" default="{}"  desc="if specified, purge only
those messages matching this filter"/>
+      <arg name="filter"  dir="I" type="map"    desc="if specified, purge only those messages
matching this filter"/>
     </method>
 
     <method name="reroute" desc="Remove all or some messages on this queue and route them
to an exchange">
       <arg name="request"        dir="I" type="uint32" desc="0 for all messages or n>0
for n messages"/>
       <arg name="useAltExchange" dir="I" type="bool"   desc="Iff true, use the queue's
configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
       <arg name="exchange"       dir="I" type="sstr"   desc="Name of the exchange to route
the messages through"/>
-      <arg name="filter"  dir="I" type="map" default="{}" desc="if specified, reroute
only those messages matching this filter"/>
+      <arg name="filter"  dir="I" type="map"           desc="if specified, reroute only
those messages matching this filter"/>
     </method>
   </class>
 
@@ -321,7 +320,7 @@
 
     <statistic name="msgMatched" type="count64"/>
   </class>
-  
+
   <!--
   ===============================================================
   Subscription
@@ -338,7 +337,7 @@
     <property name="arguments"      type="map"      access="RC"/>
     <statistic name="delivered"     type="count64"  unit="message" desc="Messages delivered"/>
   </class>
-  
+
   <!--
   ===============================================================
   Connection
@@ -366,7 +365,7 @@
     <statistic name="msgsFromClient"  type="count64"/>
     <statistic name="msgsToClient"    type="count64"/>
 
-    <method name="close"/> 
+    <method name="close"/>
   </class>
 
   <!--
@@ -379,15 +378,17 @@
     This class represents an inter-broker connection.
 
     <property name="vhostRef"  type="objId"  references="Vhost" access="RC" index="y"
parentRef="y"/>
-    <property name="host"      type="sstr"   access="RC" index="y"/>
-    <property name="port"      type="uint16" access="RC" index="y"/>
-    <property name="transport" type="sstr"   access="RC"/>
+    <property name="name"      type="sstr"   access="RC" index="y"/>
+    <property name="host"      type="sstr"   access="RO"/>
+    <property name="port"      type="uint16" access="RO"/>
+    <property name="transport" type="sstr"   access="RO"/>
     <property name="durable"   type="bool"   access="RC"/>
+    <property name="connectionRef" type="objId" references="Connection" access="RO"/>
 
     <statistic name="state"       type="sstr" desc="Operational state of the link"/>
     <statistic name="lastError"   type="lstr" desc="Reason link is not operational"/>
 
-    <method name="close"/> 
+    <method name="close"/>
 
     <method name="bridge" desc="Bridge messages over the link">
       <arg name="durable"     dir="I" type="bool"/>
@@ -411,7 +412,8 @@
   -->
   <class name="Bridge">
     <property name="linkRef"     type="objId"  references="Link" access="RC" index="y"
parentRef="y"/>
-    <property name="channelId"   type="uint16" access="RC" index="y"/>
+    <property name="name"        type="sstr"   access="RC"  index="y"/>
+    <property name="channelId"   type="uint16" access="RO"/>
     <property name="durable"     type="bool"   access="RC"/>
     <property name="src"         type="sstr"   access="RC"/>
     <property name="dest"        type="sstr"   access="RC"/>
@@ -422,7 +424,7 @@
     <property name="excludes"    type="sstr"   access="RC"/>
     <property name="dynamic"     type="bool"   access="RC"/>
     <property name="sync"        type="uint16" access="RC"/>
-    <method name="close"/> 
+    <method name="close"/>
   </class>
 
 
@@ -441,7 +443,7 @@
     <property name="expireTime"       type="absTime" access="RO" optional="y"/>
     <property name="maxClientRate"    type="uint32"  access="RO" unit="msgs/sec" optional="y"/>
 
-    <statistic name="framesOutstanding" type="count32"/>
+    <statistic name="unackedMessages" type="uint64" unit="message" desc="Unacknowledged
messages in the session"/>
 
     <statistic name="TxnStarts"    type="count64"  unit="transaction" desc="Total transactions
started "/>
     <statistic name="TxnCommits"   type="count64"  unit="transaction" desc="Total transactions
committed"/>

Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
(original)
+++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
Thu Jun 28 09:14:52 2012
@@ -36,3 +36,4 @@ from extensions import *
 from msg_groups import *
 from new_api import *
 from stats import *
+from qmf_events import *

Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
(original)
+++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
Thu Jun 28 09:14:52 2012
@@ -302,9 +302,10 @@ class ManagementTest (TestBase010):
 
         twenty = range(1,21)
         props = session.delivery_properties(routing_key="routing_key")
+        mp    = session.message_properties(application_headers={'x-qpid.trace' : 'A,B,C'})
         for count in twenty:
             body = "Reroute Message %d" % count
-            msg = Message(props, body)
+            msg = Message(props, mp, body)
             session.message_transfer(destination="amq.direct", message=msg)
 
         pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
@@ -317,6 +318,16 @@ class ManagementTest (TestBase010):
         self.assertEqual(pq.msgDepth,19)
         self.assertEqual(aq.msgDepth,1)
 
+        "Verify that the trace was cleared on the rerouted message"
+        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+        conn = qpid.messaging.Connection(url)
+        conn.open()
+        sess = conn.session()
+        rx = sess.receiver("alt-queue1;{mode:browse}")
+        rm = rx.fetch(1)
+        self.assertEqual(rm.properties['x-qpid.trace'], '')
+        conn.close()
+
         "Reroute top 9 messages from reroute-queue to alt.direct2"
         result = pq.reroute(9, False, "alt.direct2", {})
         self.assertEqual(result.status, 0) 
@@ -385,6 +396,30 @@ class ManagementTest (TestBase010):
         # Cleanup
         for e in ["A", "B"]: session.exchange_delete(exchange=e)
 
+    def test_reroute_invalid_alt_exchange(self):
+        """
+        Test that an error is returned for an attempt to reroute to
+        alternate exchange on a queue for which no such exchange has
+        been defined.
+        """
+        self.startQmf()
+        session = self.session
+        # create queue with no alt-exchange, and send a message to it
+        session.queue_declare(queue="q", exclusive=True, auto_delete=True)
+        props = session.delivery_properties(routing_key="q")
+        session.message_transfer(message=Message(props, "don't reroute me!"))
+
+        # attempt to reroute the message to alt-exchange
+        q = self.qmf.getObjects(_class="queue", name="q")[0]
+        result = q.reroute(1, True, "", {})
+        # verify the attempt fails...
+        self.assertEqual(result.status, 4) #invalid parameter
+
+        # ...and message is still on the queue
+        self.subscribe(destination="d", queue="q")
+        self.assertEqual("don't reroute me!", session.incoming("d").get(timeout=1).body)
+
+
     def test_methods_async (self):
         """
         """

Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
(original)
+++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
Thu Jun 28 09:14:52 2012
@@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base):
         snd.close()
 
 
+    def test_ttl_expire(self):
+        """ Verify that expired (TTL) group messages are skipped correctly
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
+
+        groups = ["A","B","C","A","B","C"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            if m.properties['THE-GROUP'] == 'B':
+                m.ttl = 1;
+            snd.send(m)
+
+        sleep(2)  # let all B's expire
+
+        # create consumers on separate sessions: C1,C2
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
+        s2 = self.setup_session()
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+        # C1 should acquire A-0, then C2 should acquire C-2, Group B should
+        # expire and never be fetched
+
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        m2 = c2.fetch(0);
+        assert m2.properties['THE-GROUP'] == 'C'
+        assert m2.content['index'] == 2
+
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 3
+
+        m2 = c2.fetch(0);
+        assert m2.properties['THE-GROUP'] == 'C'
+        assert m2.content['index'] == 5
+
+        # there should be no more left for either consumer
+        try:
+            mx = c1.fetch(0)
+            assert False     # should never get here
+        except Empty:
+            pass
+        try:
+            mx = c2.fetch(0)
+            assert False     # should never get here
+        except Empty:
+            pass
+
+        c1.session.acknowledge()
+        c2.session.acknowledge()
+        c1.close()
+        c2.close()
+        snd.close()
+
+
 class StickyConsumerMsgGroupTests(Base):
     """
     Tests for the behavior of sticky-consumer message groups.  These tests

Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
(original)
+++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
Thu Jun 28 09:14:52 2012
@@ -57,7 +57,7 @@ class GeneralTests(Base):
         sess2 = self.setup_session()
 
         tx = sess1.sender("amq.direct/key")
-        rx_main = sess1.receiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+        rx_main = sess1.receiver("amq.direct/key;{link:{reliability:at-least-once,x-declare:{alternate-exchange:'amq.fanout'}}}")
         rx_alt  = sess2.receiver("amq.fanout")
         rx_alt.capacity = 10
 

Propchange: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:r1300143-1349442,1349444-1349530,1349532-1353860

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore Thu Jun 28 09:14:52
2012
@@ -19,4 +19,5 @@
 # with the License.  You may obtain a copy of the License at
 /qpid-clusterc
 /qpid-configc
+/qpid-hac
 /qpid-routec

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool Thu Jun 28 09:14:52
2012
@@ -266,7 +266,7 @@ class QmfData:
     self.conn_options = conn_options
     self.qmf_options = qmf_options
     self.agent_filter = '[]'
-    self.connection = cqpid.Connection(self.url, self.conn_options)
+    self.connection = cqpid.Connection(self.url, **self.conn_options)
     self.connection.open()
     self.session = qmf2.ConsoleSession(self.connection, self.qmf_options)
     self.session.setAgentFilter(self.agent_filter)

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config Thu Jun 28 09:14:52
2012
@@ -481,7 +481,7 @@ class BrokerManager:
                 if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY],
                 if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
                 if q.altExchange:
-                    print "--alternate-exchange=%s" % q._altExchange_.name,
+                    print "--alternate-exchange=%s" % q.altExchange,
                 if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
                 if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
                 if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha Thu Jun 28 09:14:52
2012
@@ -35,18 +35,18 @@ HA_BROKER = "org.apache.qpid.ha:habroker
 class Command:
     commands = {}
 
-    def __init__(self, name, help, args=[]):
+    def __init__(self, name, help, arg_names=[]):
         Command.commands[name] = self
         self.name = name
-        self.args = args
-        usage="%s [options] %s\n\n%s"%(name, " ".join(args), help)
+        self.arg_names = arg_names
+        usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
         self.help = help
         self.op=optparse.OptionParser(usage)
         self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker
at <url>")
 
-    def execute(self):
-        opts, args = self.op.parse_args()
-        if len(args) != len(self.args)+1:
+    def execute(self, args):
+        opts, args = self.op.parse_args(args)
+        if len(args) != len(self.arg_names)+1:
             self.op.print_help()
             raise Exception("Wrong number of arguments")
         broker = opts.broker or "localhost:5672"
@@ -93,29 +93,31 @@ class SetCmd(Command):
         Command.__init__(self, "set", "Set HA configuration settings")
         def add(optname, metavar, type, help):
             self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
-        add("--brokers", "<url>", "string", "HA brokers use <url> to connect
to each other")
-        add("--public-brokers", "<url>", "string", "Clients use <url> to connect
to HA brokers")
+        add("--brokers-url", "<url>", "string", "URL with address of each broker in
the cluster. Used by brokers to connect to each other.")
+        add("--public-url", "<url>", "string", "URL advertised to clients to connect
to the cluster. May be a list or a VIP.")
         add("--backups", "<n>", "int", "Expect <n> backups to be running"),
 
     def do_execute(self, qmf_broker, ha_broker, opts, args):
-        if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER)
-        if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers},
HA_BROKER)
+        if (opts.brokers_url): qmf_broker._method("setBrokersUrl", {"url":opts.brokers_url},
HA_BROKER)
+        if (opts.public_url): qmf_broker._method("setPublicUrl", {"url":opts.public_url},
HA_BROKER)
         if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups},
HA_BROKER)
 
 SetCmd()
 
 class QueryCmd(Command):
     def __init__(self):
-        Command.__init__(self, "query", "Print HA configuration settings")
+        Command.__init__(self, "query", "Print HA configuration and status")
 
     def do_execute(self, qmf_broker, ha_broker, opts, args):
         hb = ha_broker
         for x in [("Status:", hb.status),
-                  ("Brokers URL:", hb.brokers),
-                  ("Public URL:", hb.publicBrokers),
-                  ("Expected Backups:", hb.expectedBackups)
+                  ("Brokers URL:", hb.brokersUrl),
+                  ("Public URL:", hb.publicUrl),
+                  ("Expected Backups:", hb.expectedBackups),
+                  ("Replicate: ", hb.replicateDefault)
                   ]:
             print "%-20s %s"%(x[0], x[1])
+
 QueryCmd()
 
 def print_usage(prog):
@@ -143,7 +145,7 @@ def main(argv):
         if not command:
             print_usage(os.path.basename(argv[0]));
             return 1;
-        if command.execute(): return 1
+        if command.execute(args): return 1
     except Exception, e:
         print e
         return 1

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents Thu Jun 28
09:14:52 2012
@@ -21,34 +21,85 @@
 
 import os
 import optparse
-from optparse import IndentedHelpFormatter
 import sys
-import socket
-from time import time, strftime, gmtime, sleep
-from qmf.console import Console, Session
+from optparse       import IndentedHelpFormatter
+from time           import time, strftime, gmtime, sleep
+from threading      import Lock, Condition, Thread
+from qpid.messaging import Connection
+import qpid.messaging.exceptions
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import EventHelper
+
+
+class Printer(object):
+  """
+  This class serializes printed lines so that events coming from different
+  threads don't overlap each other.
+  """
+  def __init__(self):
+    self.lock = Lock()
 
-
-class EventConsole(Console):
-  def event(self, broker, event):
-    print event
-    sys.stdout.flush()
-
-  def brokerConnected(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s"
% broker.getUrl()
+  def pr(self, text):
+    self.lock.acquire()
+    try:
+      print text
+    finally:
+      self.lock.release()
     sys.stdout.flush()
+  
 
-  def brokerConnectionFailed(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed
broker=%s %s" % (broker.getUrl(), str(broker.conn_exc))
-    sys.stdout.flush()
+class EventReceiver(Thread):
+  """
+  One instance of this class is created for each broker that is being monitored.
+  This class does not use the "reconnect" option because it needs to report as
+  events when the connection is established and when it's lost.
+  """
+  def __init__(self, printer, url, mechanism, options):
+    Thread.__init__(self)
+    self.printer   = printer
+    self.url       = url
+    self.mechanism = mechanism
+    self.options   = options
+    self.running   = True
+    self.helper    = EventHelper()
+
+  def cancel(self):
+    self.running = False
+
+  def run(self):
+    isOpen = False
+    while self.running:
+      try:
+        conn = Connection.establish(self.url, sasl_mechanisms=self.mechanism, client_properties=self.options)
+        isOpen = True
+        self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected
broker=%s" % self.url)
+
+        sess = conn.session()
+        rx = sess.receiver(self.helper.eventAddress())
+
+        while self.running:
+          try:
+            msg = rx.fetch(1)
+            event = self.helper.event(msg)
+            self.printer.pr(event.__repr__())
+            sess.acknowledge()
+          except qpid.messaging.exceptions.Empty:
+            pass
+        
+      except Exception, e:
+        if isOpen:
+          self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected
broker=%s" % self.url)
+        isOpen = False
+        sleep(1)
 
-  def brokerDisconnected(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s"
% broker.getUrl()
-    sys.stdout.flush()
 
 class JHelpFormatter(IndentedHelpFormatter):
-    """Format usage and description without stripping newlines from usage strings
     """
-
+    Format usage and description without stripping newlines from usage strings
+    """
     def format_usage(self, usage):
         return usage
 
@@ -87,16 +138,23 @@ def main(argv=None):
   if len(arguments) == 0:
     arguments.append("localhost")
 
-  console = EventConsole()
-  session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True)
-  brokers = []
+  brokers   = []
+  mechanism = options.sasl_mechanism
+  props     = {'qpid.ha-admin' : 1}
+  printer   = Printer()
+
+  if options.heartbeats:
+    props['heartbeat'] = 5
+
   try:
     try:
       for host in arguments:
-        brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+        er = EventReceiver(printer, host, mechanism, props)
+        brokers.append(er)
+        er.start()
 
-        while (True):
-          sleep(10)
+      while (True):
+        sleep(10)
 
     except KeyboardInterrupt:
         print
@@ -106,9 +164,10 @@ def main(argv=None):
         print "Failed: %s - %s" % (e.__class__.__name__, e)
         return 1
   finally:
-    while len(brokers):
-      b = brokers.pop()
-      session.delBroker(b)
+    for b in brokers:
+      b.cancel()
+    for b in brokers:
+      b.join()
 
 if __name__ == '__main__':
   sys.exit(main())

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat Thu Jun 28 09:14:52
2012
@@ -52,7 +52,16 @@ def OptionsAndArguments(argv):
 
     global config
 
-    parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]")
+    usage = \
+"""%prog -g [options]
+       %prog -c [options]
+       %prog -e [options]
+       %prog -q [options] [queue-name]
+       %prog -u [options]
+       %prog -m [options]
+       %prog --acl [options]"""
+
+    parser = OptionParser(usage=usage)
 
     group1 = OptionGroup(parser, "General Options")
     group1.add_option("-b", "--broker",  action="store", type="string", default="localhost",
metavar="<url>",
@@ -64,7 +73,7 @@ def OptionsAndArguments(argv):
     group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup
broker.")
     parser.add_option_group(group1)
 
-    group2 = OptionGroup(parser, "Display Options")
+    group2 = OptionGroup(parser, "Command Options")
     group2.add_option("-g", "--general", help="Show General Broker Stats",  action="store_const",
const="g",   dest="show")
     group2.add_option("-c", "--connections", help="Show Connections",       action="store_const",
const="c",   dest="show")
     group2.add_option("-e", "--exchanges", help="Show Exchanges",           action="store_const",
const="e",   dest="show")
@@ -72,12 +81,14 @@ def OptionsAndArguments(argv):
     group2.add_option("-u", "--subscriptions", help="Show Subscriptions",   action="store_const",
const="u",   dest="show")
     group2.add_option("-m", "--memory", help="Show Broker Memory Stats",    action="store_const",
const="m",   dest="show")
     group2.add_option(      "--acl", help="Show Access Control List Stats", action="store_const",
const="acl", dest="show")
-    group2.add_option("-S", "--sort-by",  metavar="<colname>",                   help="Sort
by column name")
-    group2.add_option("-I", "--increasing", action="store_true", default=False,  help="Sort
by increasing value (default = decreasing)")
-    group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",    help="Limit
output to n rows")
-
     parser.add_option_group(group2)
 
+    group3 = OptionGroup(parser, "Display Options")
+    group3.add_option("-S", "--sort-by",  metavar="<colname>",                   help="Sort
by column name")
+    group3.add_option("-I", "--increasing", action="store_true", default=False,  help="Sort
by increasing value (default = decreasing)")
+    group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>",    help="Limit
output to n rows")
+    parser.add_option_group(group3)
+
     opts, args = parser.parse_args(args=argv)
 
     if not opts.show:
@@ -416,7 +427,8 @@ class BrokerManager:
         heads.append(Header("acked", Header.Y))
         heads.append(Header("excl", Header.Y))
         heads.append(Header("creditMode"))
-        heads.append(Header("delivered", Header.KMG))
+        heads.append(Header("delivered", Header.COMMAS))
+        heads.append(Header("sessUnacked", Header.COMMAS))
         rows = []
         subscriptions = self.broker.getAllSubscriptions()
         sessions = self.getSessionMap()
@@ -436,6 +448,7 @@ class BrokerManager:
                 row.append(s.exclusive)
                 row.append(s.creditMode)
                 row.append(s.delivered)
+                row.append(session.unackedMessages)
                 rows.append(row)
             except:
                 pass

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool Thu Jun 28 09:14:52
2012
@@ -455,6 +455,7 @@ class QmfData(Console):
           rows.append(row)
       else:
         print "No object found with ID %d" % dispId
+        return
     finally:
       self.lock.release()
     self.disp.table(caption, heads, rows)

Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py (original)
+++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py Thu
Jun 28 09:14:52 2012
@@ -18,6 +18,7 @@
 #
 
 from qpid.messaging import Message
+from qpidtoollibs.disp import TimeLong
 try:
   from uuid import uuid4
 except ImportError:
@@ -190,6 +191,9 @@ class BrokerAgent(object):
   def getAcl(self):
     return self._getSingleObject(Acl)
 
+  def getMemory(self):
+    return self._getSingleObject(Memory)
+
   def echo(self, sequence, body):
     """Request a response to test the path to the management broker"""
     pass
@@ -268,6 +272,20 @@ class BrokerAgent(object):
   def reloadAclFile(self):
     self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
 
+  def acl_lookup(self, userName, action, aclObj, aclObjName, propMap):
+    args = {'userId':      userName,
+            'action':      action,
+            'object':      aclObj,
+            'objectName':  aclObjName,
+            'propertyMap': propMap}
+    return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+
+  def acl_lookupPublish(self, userName, exchange, key):
+    args = {'userId':       userName,
+            'exchangeName': exchange,
+            'routingKey':   key}
+    return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+
   def create(self, _type, name, properties, strict):
     """Create an object of the specified type"""
     pass
@@ -281,6 +299,41 @@ class BrokerAgent(object):
     return self._getBrokerObject(self, _type, oid)
 
 
+class EventHelper(object):
+  def eventAddress(self, pkg='*', cls='*', sev='*'):
+    return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls,
sev)
+
+  def event(self, msg):
+    return BrokerEvent(msg)
+
+
+class BrokerEvent(object):
+  def __init__(self, msg):
+    self.msg = msg
+    self.content = msg.content[0]
+    self.values = self.content['_values']
+    self.schema_id = self.content['_schema_id']
+    self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name'])
+
+  def __repr__(self):
+    rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name)
+    for k,v in self.values.items():
+      rep = rep + " %s=%s" % (k, v)
+    return rep
+
+  def __getattr__(self, key):
+    if key not in self.values:
+      return None
+    value = self.values[key]
+    return value
+
+  def getAttributes(self):
+    return self.values
+
+  def getTimestamp(self):
+    return self.content['_timestamp']
+
+
 class BrokerObject(object):
   def __init__(self, broker, content):
     self.broker = broker
@@ -348,7 +401,7 @@ class Connection(BrokerObject):
     BrokerObject.__init__(self, broker, values)
 
   def close(self):
-    pass
+    self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address)
 
 class Session(BrokerObject):
   def __init__(self, broker, values):

Modified: qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp?rev=1354874&r1=1354873&r2=1354874&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
(original)
+++ qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
Thu Jun 28 09:14:52 2012
@@ -49,6 +49,7 @@
 
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/Connection.h"
+#include "qpid/framing/FieldValue.h"
 
 
 #include <map>
@@ -472,13 +473,15 @@ INT ResourceManager::recover(XID *xids, 
 	try {
 	    // status if we can't talk to the broker
 	    status = XAER_RMFAIL;
-	    std::vector<std::string> wireFormatXids;
 
 	    DtxRecoverResult dtxrr = qpidSession.dtxRecover(true);
 
 	    // status if we can't process the xids
 	    status = XAER_RMERR;
-	    dtxrr.getInDoubt().collect(wireFormatXids);
+
+        std::vector<std::string> wireFormatXids(dtxrr.getInDoubt().size());
+        std::transform(dtxrr.getInDoubt().begin(), dtxrr.getInDoubt().end(), wireFormatXids.begin(),
Array::get<std::string, Array::ValuePtr>);
+
 	    size_t nXids = wireFormatXids.size();
 
 	    if (nXids > 0) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message