qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r807210 - in /qpid/trunk/qpid/python/qpid: messaging.py tests/messaging.py
Date Mon, 24 Aug 2009 13:38:21 GMT
Author: rhs
Date: Mon Aug 24 13:38:21 2009
New Revision: 807210

URL: http://svn.apache.org/viewvc?rev=807210&view=rev
Log:
added some test assertions; modified driver/client interaction for drain; and fixed handling
of unlimited capacity

Modified:
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=807210&r1=807209&r2=807210&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Mon Aug 24 13:38:21 2009
@@ -368,6 +368,10 @@
     sender = Sender(self, len(self.senders), target)
     self.senders.append(sender)
     self.wakeup()
+    # XXX: because of the lack of waiting here we can end up getting
+    # into the driver loop with messages sent for senders that haven't
+    # been linked yet, something similar can probably happen for
+    # receivers
     return sender
 
   @synchronized
@@ -636,7 +640,7 @@
     self.started = started
     self.capacity = UNLIMITED
     self.granted = Serial(0)
-    self.draining = False
+    self.drain = False
     self.impending = Serial(0)
     self.received = Serial(0)
     self.returned = Serial(0)
@@ -697,34 +701,41 @@
     if self._capacity() == 0:
       self.granted = self.returned + 1
       self.wakeup()
-      self.ewait(lambda: self.impending == self.granted)
+    self.ewait(lambda: self.impending == self.granted)
     msg = self.session._get(self._pred, timeout=timeout)
     if msg is None:
-      self.draining = True
+      self.drain = True
+      self.granted = self.received
+      self.wakeup()
+      self.ewait(lambda: self.impending == self.received)
+      self.drain = False
+      self._grant()
       self.wakeup()
-      self.ewait(lambda: not self.draining)
-      assert self.granted == self.received
-      if self.capacity is not UNLIMITED:
-        self.granted += self._capacity()
-        self.wakeup()
       msg = self.session._get(self._pred, timeout=0)
       if msg is None:
         raise Empty()
-    if self._capacity() not in (0, UNLIMITED.value):
+    elif self._capacity() not in (0, UNLIMITED.value):
       self.granted += 1
       self.wakeup()
     return msg
 
+  def _grant(self):
+    if self.started:
+      if self.capacity is UNLIMITED:
+        self.granted = UNLIMITED
+      else:
+        self.granted = self.received + self._capacity()
+    else:
+      self.granted = self.received
+
+
   @synchronized
   def start(self):
     """
     Start incoming message delivery for this receiver.
     """
     self.started = True
-    if self.capacity is UNLIMITED:
-      self.granted = UNLIMITED
-    else:
-      self.granted = self.received + self._capacity()
+    self._grant()
     self.wakeup()
 
   @synchronized
@@ -732,8 +743,8 @@
     """
     Stop incoming message delivery for this receiver.
     """
-    self.granted = self.received
     self.started = False
+    self._grant()
     self.wakeup()
     self.ewait(lambda: self.impending == self.received)
 
@@ -890,6 +901,7 @@
         for ssn in self.connection.sessions.values():
           self.attach(ssn)
           self.process(ssn)
+
       exi = None
     except:
       exi = sys.exc_info()
@@ -1107,37 +1119,34 @@
       if rcv.impending is UNLIMITED:
         delta = 0
       else:
-        delta = UNLIMITED.value
+        delta = UNLIMITED
+    elif rcv.impending is UNLIMITED:
+      delta = -1
     else:
-      delta = rcv.granted - rcv.impending
+      delta = max(rcv.granted, rcv.received) - rcv.impending
 
-    if delta > 0:
+    if delta is UNLIMITED:
+      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
+      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+      rcv.impending = UNLIMITED
+    elif delta > 0:
       _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
       _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
       rcv.impending += delta
     elif delta < 0:
-      _ssn.message_stop(rcv.destination, sync=True)
+      if rcv.drain:
+        _ssn.message_flush(rcv.destination, sync=True)
+      else:
+        _ssn.message_stop(rcv.destination, sync=True)
       # XXX: need to kill syncs
       _ssn.sync()
       rcv.impending = rcv.received
-      # XXX: this can recurse infinitely if granted drops below received
       self.grant(rcv)
 
   def process_receiver(self, rcv):
     if rcv.closed: return
-    _ssn = self._attachments[rcv.session]
-    _rcv = self._attachments[rcv]
-
     self.grant(rcv)
 
-    if rcv.draining:
-      _ssn.message_flush(rcv.destination, sync=True)
-      # XXX: really need to make this async so that we don't give up the lock
-      _ssn.sync()
-      rcv.granted = rcv.received
-      rcv.impending = rcv.received
-      rcv.draining = False
-
   def send(self, snd, msg):
     _ssn = self._attachments[snd.session]
     _snd = self._attachments[snd]

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=807210&r1=807209&r2=807210&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Mon Aug 24 13:38:21 2009
@@ -485,7 +485,8 @@
     self.sleep()
     self.assertPending(self.rcv, 5)
 
-    self.drain(self.rcv)
+    drained = self.drain(self.rcv)
+    assert len(drained) == 10
     self.assertPending(self.rcv, 0)
 
     self.ssn.acknowledge()
@@ -493,15 +494,15 @@
   def testCapacityUNLIMITED(self):
     self.rcv.capacity = UNLIMITED
     self.rcv.start()
-    assert self.rcv.pending() == 0
+    self.assertPending(self.rcv, 0)
 
     for i in range(10):
       self.send("testCapacityUNLIMITED", i)
     self.sleep()
-    assert self.rcv.pending() == 10
+    self.assertPending(self.rcv, 10)
 
     self.drain(self.rcv)
-    assert self.rcv.pending() == 0
+    self.assertPending(self.rcv, 0)
 
     self.ssn.acknowledge()
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message