qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-python git commit: QPID-7317: Fix hangs in qpid.messaging.
Date Fri, 23 Sep 2016 21:27:56 GMT
Repository: qpid-python
Updated Branches:
  refs/heads/master 81f09ae33 -> 037c57387


QPID-7317: Fix hangs in qpid.messaging.

Hang is observed in processes using qpid.messaging with a thread blocked waiting
for the Selector to wake it, but no Selector.run thread.

This patch removes all the known ways that this hang can occur. Either we
function normally or immediately raise an exception and log to the
"qpid.messaging" logger a message starting with "qpid.messaging:"

The following issues are fixed:

1. The Selector.run() thread raises a fatal exception.

Use of qpid.messaging will re-raise the exception immediately, not hang.

2. The process forks, so child has no Selector thread.

https://issues.apache.org/jira/browse/QPID-5637 resets the Selector after a fork.
In addition we now:

- Close Selector.waiter: its file descriptors are shared with the parent which
  can cause havoc if they "steal" each other's wakeups.

- Replace Endpoint._lock in related endpoints with a BrokenLock. If the parent
  is holding locks when it forks, they remain locked forever in the child.
  BrokenLock.acquire() raises instead of hanging.

3. Selector.stop() called on atexit.

Selector.stop was registered via atexit, which could cause a hang if
qpid.messaging was used in a later-executing atexit function. That has been
removed, Selector.run() is in a daemon thread so there is no need for stop()

4. User calls Selector.stop() directly

There is no reason to do this for the default Selector used by qpid.messaging,
so for that case stop() is now ignored. It works as before for code that creates
its own qpid.Selector instances.


Project: http://git-wip-us.apache.org/repos/asf/qpid-python/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-python/commit/037c5738
Tree: http://git-wip-us.apache.org/repos/asf/qpid-python/tree/037c5738
Diff: http://git-wip-us.apache.org/repos/asf/qpid-python/diff/037c5738

Branch: refs/heads/master
Commit: 037c5738734d8fecb7b7f7e7af4e4f14f9cd3a64
Parents: 81f09ae
Author: Alan Conway <aconway@redhat.com>
Authored: Fri Sep 23 17:23:55 2016 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Fri Sep 23 17:26:39 2016 -0400

----------------------------------------------------------------------
 qpid/selector.py | 72 ++++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-python/blob/037c5738/qpid/selector.py
----------------------------------------------------------------------
diff --git a/qpid/selector.py b/qpid/selector.py
index 4a15d56..32e542b 100644
--- a/qpid/selector.py
+++ b/qpid/selector.py
@@ -16,13 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import atexit, time, errno, os
+import time, errno, os
 from compat import select, SelectError, set, selectable_waiter, format_exc
 from threading import Thread, Lock
 from logging import getLogger
 
 log = getLogger("qpid.messaging")
 
+class SelectorException(Exception): pass
+
 class Acceptor:
 
   def __init__(self, sock, handler):
@@ -53,8 +55,12 @@ class Selector:
     Selector.lock.acquire()
     try:
       if Selector.DEFAULT is None or Selector._current_pid != os.getpid():
+        # If we forked, mark the existing Selector dead.
+        if Selector.DEFAULT is not None:
+          log.warning("qpid.messaging: process was forked")
+          Selector.DEFAULT.dead(
+            SelectorException("qpid.messaging: forked child process used parent connection"),
True)
         sel = Selector()
-        atexit.register(sel.stop)
         sel.start()
         Selector.DEFAULT = sel
         Selector._current_pid = os.getpid()
@@ -73,6 +79,9 @@ class Selector:
     self.exception = None
 
   def wakeup(self):
+    if self.exception:
+      log.error(str(self.exception))
+      raise self.exception
     self.waiter.wakeup()
 
   def register(self, selectable):
@@ -102,13 +111,14 @@ class Selector:
 
   def start(self):
     self.stopped = False
+    self.exception = None
     self.thread = Thread(target=self.run)
     self.thread.setDaemon(True)
     self.thread.start();
 
   def run(self):
     try:
-      while not self.stopped:
+      while not self.stopped and not self.exception:
         wakeup = None
         for sel in self.selectables.copy():
           t = self._update(sel)
@@ -152,16 +162,60 @@ class Selector:
           if w is not None and now > w:
             sel.timeout()
     except Exception, e:
-      self.exception = e
-      info = format_exc()
-      log.error("qpid.messaging I/O thread has died: %s" % str(e))
-      for sel in self.selectables.copy():
-        if hasattr(sel, "abort"):
-          sel.abort(e, info)
+      log.error("qpid.messaging: I/O thread has died: %s\n%s" % (e, format_exc()))
+      dead(e, False)
       raise
+    self.dead(SelectorException("qpid.messaging: I/O thread exited"), False)
 
   def stop(self, timeout=None):
+    """Stop the selector and wait for it's thread to exit.
+    Ignored for the shared default() selector, which stops when the process exits.
+
+    """
+    if self.DEFAULT == self:    # Never stop the DEFAULT Selector
+      return
     self.stopped = True
     self.wakeup()
     self.thread.join(timeout)
+    self.dead(SelectorException("qpid.messaging: I/O thread stopped"), False)
+
+  def dead(self, e, forked):
+    """Mark the Selector as dead if it is stopped for any reason.
+    Ensure there any future calls to wait() will raise an exception.
+    If the thread died because of a fork() then ensure further that
+    attempting to take the connections lock also raises.
+    """
     self.thread = None
+    self.exception = e
+    for sel in self.selectables.copy():
+      try:
+        # Mark the connection as failed
+        sel.connection.error = e
+        if forked:
+          # Replace connection's locks, they may be permanently locked in the forked child.
+          c = sel.connection
+          c.error = e
+          c._lock = BrokenLock(e)
+          for ssn in c.sessions.values():
+            ssn._lock = c._lock
+            for l in ssn.senders + ssn.receivers:
+              l._lock = c._lock
+      except:
+        pass
+    try:
+      if forked:
+        self.waiter.close()       # Don't mess with the parent's FDs
+      else:
+        self.waiter.wakeup()      # In case somebody re-waited while we were cleaning up.
+    except:
+      pass
+
+class BrokenLock(object):
+  """Dummy lock-like object that raises an exception. Used in forked child to
+      replace locks that may be held in the parent process."""
+  def __init__(self, exception):
+    self.exception = exception
+
+  def acquire(self):
+    log.error(str(self.exception))
+    raise self.exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message