qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1713943 - in /qpid/trunk/qpid/python/qpid: messaging/driver.py selector.py
Date Wed, 11 Nov 2015 22:45:42 GMT
Author: kgiusti
Date: Wed Nov 11 22:45:42 2015
New Revision: 1713943

URL: http://svn.apache.org/viewvc?rev=1713943&view=rev
Log:
QPID-6839: python-qpid: Log the failure of the Selector thread

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/selector.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1713943&r1=1713942&r2=1713943&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Nov 11 22:45:42 2015
@@ -390,18 +390,37 @@ class Driver:
 
   @synchronized
   def reading(self):
+    """Called by the Selector I/O thread to determine if the driver needs to
+    wait on the arrival of network data (call self.readable() callback)
+    """
     return self._transport is not None and \
         self._transport.reading(True)
 
   @synchronized
   def writing(self):
+    """Called by the Selector I/O thread to determine if it should block
+    waiting for output bandwidth (call the self.writeable() callback)
+    """
     return self._transport is not None and \
         self._transport.writing(self.engine.pending())
 
   @synchronized
   def timing(self):
+    """Called by the Selector I/O thread to determine if it should wake up the
+    driver (call the timeout() callback
+    """
     return self._timeout
 
+  @synchronized
+  def abort(self, exc, info):
+    """Called if the Selector I/O thread hits an unrecoverable error and fails.
+    """
+    try:
+      self.connection.error = exc
+      log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info))
+    except:
+      pass
+
   def _check_retry_ok(self):
     """We consider a reconnect to have suceeded only when we have received
     open-ok from the peer.

Modified: qpid/trunk/qpid/python/qpid/selector.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/selector.py?rev=1713943&r1=1713942&r2=1713943&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/selector.py (original)
+++ qpid/trunk/qpid/python/qpid/selector.py Wed Nov 11 22:45:42 2015
@@ -17,8 +17,11 @@
 # under the License.
 #
 import atexit, time, errno, os
-from compat import select, SelectError, set, selectable_waiter
+from compat import select, SelectError, set, selectable_waiter, format_exc
 from threading import Thread, Lock
+from logging import getLogger
+
+log = getLogger("qpid.messaging")
 
 class Acceptor:
 
@@ -67,6 +70,7 @@ class Selector:
     self.reading.add(self.waiter)
     self.stopped = False
     self.thread = None
+    self.exception = None
 
   def wakeup(self):
     self.waiter.wakeup()
@@ -103,48 +107,58 @@ class Selector:
     self.thread.start();
 
   def run(self):
-    while not self.stopped:
-      wakeup = None
-      for sel in self.selectables.copy():
-        t = self._update(sel)
-        if t is not None:
-          if wakeup is None:
-            wakeup = t
-          else:
-            wakeup = min(wakeup, t)
-
-      rd = []
-      wr = []
-      ex = []
-
-      while True:
-        try:
-          if wakeup is None:
-            timeout = None
-          else:
-            timeout = max(0, wakeup - time.time())
-          rd, wr, ex = select(self.reading, self.writing, (), timeout)
-          break
-        except SelectError, e:
-          # Repeat the select call if we were interrupted.
-          if e[0] == errno.EINTR:
-            continue
-          else:
-            raise
-
-      for sel in wr:
-        if sel.writing():
-          sel.writeable()
-
-      for sel in rd:
-        if sel.reading():
-          sel.readable()
-
-      now = time.time()
+    try:
+      while not self.stopped:
+        wakeup = None
+        for sel in self.selectables.copy():
+          t = self._update(sel)
+          if t is not None:
+            if wakeup is None:
+              wakeup = t
+            else:
+              wakeup = min(wakeup, t)
+
+        rd = []
+        wr = []
+        ex = []
+
+        while True:
+          try:
+            if wakeup is None:
+              timeout = None
+            else:
+              timeout = max(0, wakeup - time.time())
+            rd, wr, ex = select(self.reading, self.writing, (), timeout)
+            break
+          except SelectError, e:
+            # Repeat the select call if we were interrupted.
+            if e[0] == errno.EINTR:
+              continue
+            else:
+              # unrecoverable: promote to outer try block
+              raise
+
+        for sel in wr:
+          if sel.writing():
+            sel.writeable()
+
+        for sel in rd:
+          if sel.reading():
+            sel.readable()
+
+        now = time.time()
+        for sel in self.selectables.copy():
+          w = sel.timing()
+          if w is not None and now > w:
+            sel.timeout()
+    except Exception, e:
+      self.exception = e
+      info = format_exc()
+      log.error("qpid.messaging I/O thread has died: %s" % str(e))
       for sel in self.selectables.copy():
-        w = sel.timing()
-        if w is not None and now > w:
-          sel.timeout()
+        if hasattr(sel, "abort"):
+          sel.abort(e, info)
+      raise
 
   def stop(self, timeout=None):
     self.stopped = True



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message