qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1069322 [2/2] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ cpp/xml/ tests/src/py/qpid_tests/broker_0_10/
Date Thu, 10 Feb 2011 10:12:42 GMT
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py?rev=1069322&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py Thu Feb 10 10:12:41 2011
@@ -0,0 +1,221 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+from qpid.compat import set
+import math
+
+class PriorityTests (Base):
+    """
+    Test prioritised messaging
+    """ 
+
+    def setup_connection(self):
+        return Connection.establish(self.broker, **self.connection_options())
+
+    def setup_session(self):
+        return self.conn.session()
+
+    def prioritised_delivery(self, priorities, levels=10):
+        """
+        Test that message on a queue are delivered in priority order.
+        """
+        msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}"
% levels,
+                              durable=self.durable())
+        for m in msgs: snd.send(m)
+
+        rcv = self.ssn.receiver(snd.target)
+        for expected in sorted(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True):
+            msg = rcv.fetch(0)
+            #print "expected priority %s got %s" % (expected.priority, msg.priority)
+            assert msg.content == expected.content
+            self.ssn.acknowledge(msg)
+
+    def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10):
+        msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+        limit_policy = "x-qpid-fairshare:%s" % default_limit
+        if limits:
+            for k, v in limits.items():
+                limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v)
+
+        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s,
%s}}}}"
+                              % (levels, limit_policy),
+                              durable=self.durable())
+        for m in msgs: snd.send(m)
+
+        rcv = self.ssn.receiver(snd.target)
+        if limits:
+            limit_function = lambda x : limits.get(x, 0)
+        else:
+            limit_function = lambda x : default_limit
+        for expected in fairshare(sorted(msgs, key=lambda m: priority_level(m.priority,levels),
reverse=True), 
+                                  limit_function, levels):
+            msg = rcv.fetch(0)
+            #print "expected priority %s got %s" % (expected.priority, msg.priority)
+            assert msg.priority == expected.priority
+            assert msg.content == expected.content
+            self.ssn.acknowledge(msg)
+
+    def test_prioritised_delivery_1(self):
+        self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels =
10)
+
+    def test_prioritised_delivery_2(self):
+        self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels =
5)
+
+    def test_fairshare_1(self):
+        self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3])
+
+    def test_fairshare_2(self):
+        self.fairshare_delivery(priorities = [10 for i in range(30)])
+
+    def test_fairshare_3(self):
+        self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3],
limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8)
+
+    def test_browsing(self):
+        priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]
+        msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+        snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}",
+                              durable=self.durable())
+        for m in msgs: snd.send(m)
+
+        rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}")
+        received = []
+        try:
+            while True: received.append(rcv.fetch(0))
+        except Empty: None
+        #check all messages on the queue were received by the browser; don't relay on any
specific ordering at present
+        assert set([m.content for m in msgs]) == set([m.content for m in received])
+
+    def ring_queue_check(self, msgs):
+        """
+        Ensure that a ring queue removes lowest priority messages first.
+        """
+        snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10,
'qpid.policy_type':ring, 'qpid.max_count':10"),
+                              durable=self.durable())
+        for m in msgs: snd.send(m)
+
+        rcv = self.ssn.receiver(snd.target)
+        received = []
+        try:
+            while True: received.append(rcv.fetch(0))
+        except Empty: None
+
+        expected = []
+        for m in msgs:
+            while len(expected) > 9:
+                expected.sort(key=lambda x: priority_level(x.priority,10))
+                expected.pop(0)
+            expected.append(m)
+        #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for
m in expected], [m.content for m in received])        
+        assert [m.content for m in expected] == [m.content for m in received]
+
+    def test_ring_queue_1(self):
+        priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3]
+        seq = content("msg")    
+        self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+    def test_ring_queue_2(self):
+        priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9]
+        seq = content("msg")    
+        self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+    def test_requeue(self):
+        priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]
+        msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}",
+                              durable=self.durable())
+        #want to have some messages requeued so enable prefetch on a dummy receiver
+        other = self.conn.session()
+        dummy = other.receiver("priority-queue")
+        dummy.capacity = 10
+
+        for m in msgs: snd.send(m)
+
+        #fetch some with dummy receiver on which prefetch is also enabled
+        for i in range(5):
+            msg = dummy.fetch(0)
+        #close session without acknowledgements to requeue messages
+        other.close()
+
+        #now test delivery works as expected after that
+        rcv = self.ssn.receiver(snd.target)
+        for expected in sorted(msgs, key=lambda m: priority_level(m.priority,10), reverse=True):
+            msg = rcv.fetch(0)
+            #print "expected priority %s got %s" % (expected.priority, msg.priority)
+            assert msg.content == expected.content
+            self.ssn.acknowledge(msg)
+
+def content(base, counter=1):
+    while True:
+        yield "%s-%s" % (base, counter)        
+        counter += 1
+
+def address(name, create_policy="sender", delete_policy="receiver", arguments=None):
+    if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments
+    else: node = "node: {}"
+    return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node)
+
+def fairshare(msgs, limit, levels):
+    """
+    Generator to return prioritised messages in expected order for a given fairshare limit
+    """
+    count = 0
+    last_priority = None
+    postponed = []
+    while msgs or postponed:
+        if not msgs: 
+            msgs = postponed
+            count = 0
+            last_priority = None
+            postponed = []            
+        msg = msgs.pop(0)
+        if last_priority and priority_level(msg.priority, levels) == last_priority:
+            count += 1
+        else:
+            last_priority = priority_level(msg.priority, levels)
+            count = 1
+        l = limit(last_priority)
+        if (l and count > l):
+            postponed.append(msg)
+        else:
+            yield msg         
+    return
+
+def effective_priority(value, levels):
+    """
+    Method to determine effective priority given a distinct number of
+    levels supported. Returns the lowest priority value that is of
+    equivalent priority to the value passed in.
+    """
+    if value <= 5-math.ceil(levels/2.0): return 0
+    if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0)
+    return value
+
+def priority_level(value, levels):
+    """
+    Method to determine which of a distinct number of priority levels
+    a given value falls into.
+    """
+    offset = 5-math.ceil(levels/2.0)
+    return min(max(value - offset, 0), levels-1)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message