qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1658984 - in /qpid/trunk/qpid/python/qpid: messaging/driver.py messaging/endpoints.py tests/messaging/endpoints.py
Date Wed, 11 Feb 2015 15:12:55 GMT
Author: kgiusti
Date: Wed Feb 11 15:12:54 2015
New Revision: 1658984

URL: http://svn.apache.org/r1658984
Log:
QPID-5799: provide notification callback for received messages.

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1658984&r1=1658983&r2=1658984&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Feb 11 15:12:54 2015
@@ -1368,7 +1368,8 @@ class Engine:
       assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
     rcv.received += 1
     log.debug("RCVD[%s]: %s", ssn.log_id, msg)
-    ssn.incoming.append(msg)
+    ssn.message_received(msg)
+
 
   def _decode(self, xfr):
     dp = EMPTY_DP

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=1658984&r1=1658983&r2=1658984&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Wed Feb 11 15:12:54 2015
@@ -569,6 +569,7 @@ class Session(Endpoint):
     self.closed = False
 
     self._lock = connection._lock
+    self._msg_received = None
 
   def __repr__(self):
     return "<Session %s>" % self.name
@@ -600,6 +601,11 @@ class Session(Endpoint):
     if self.closed:
       raise SessionClosed()
 
+  def message_received(self, msg):
+      self.incoming.append(msg)
+      if self._msg_received:
+          self._msg_received()
+
   @synchronized
   def sender(self, target, **options):
     """
@@ -685,6 +691,18 @@ class Session(Endpoint):
     return None
 
   @synchronized
+  def set_message_received_handler(self, handler):
+      """Register a callback that will be invoked when a message arrives on the
+      session.  Use with caution: since this callback is invoked in the context
+      of the driver thread, it is not safe to call any of the public messaging
+      APIs from within this callback.  The intent of the handler is to provide
+      an efficient way to notify the application that a message has arrived.
+      This can be useful for those applications that need to schedule a task
+      to poll for received messages without blocking in the messaging API.
+      """
+      self._msg_received = handler
+
+  @synchronized
   def next_receiver(self, timeout=None):
     if self._ecwait(lambda: self.incoming, timeout):
       return self.incoming[0]._receiver

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=1658984&r1=1658983&r2=1658984&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Wed Feb 11 15:12:54 2015
@@ -660,6 +660,31 @@ class SessionTests(Base):
     except Detached:
       pass
 
+  def testRxCallback(self):
+    """Verify that the callback is invoked when a message is received.
+    """
+    ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}'
+    class CallbackHandler:
+        def __init__(self):
+            self.handler_called = False
+        def __call__(self):
+            self.handler_called = True
+    cb = CallbackHandler()
+    self.ssn.set_message_received_handler(cb)
+    rcv = self.ssn.receiver(ADDR)
+    rcv.capacity = UNLIMITED
+    snd = self.ssn.sender(ADDR)
+    assert not cb.handler_called
+    snd.send("Ping")
+    deadline = time.time() + self.timeout()
+    while time.time() < deadline:
+        if cb.handler_called:
+            break;
+    assert cb.handler_called
+    snd.close()
+    rcv.close()
+
+
 RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
 
 class ReceiverTests(Base):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message