Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CC949180DB for ; Wed, 11 Nov 2015 22:45:58 +0000 (UTC) Received: (qmail 63368 invoked by uid 500); 11 Nov 2015 22:45:58 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 63337 invoked by uid 500); 11 Nov 2015 22:45:58 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 63328 invoked by uid 99); 11 Nov 2015 22:45:58 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 22:45:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1DE60C0098 for ; Wed, 11 Nov 2015 22:45:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.791 X-Spam-Level: X-Spam-Status: No, score=0.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id A672Y5eE7u6p for ; Wed, 11 Nov 2015 22:45:43 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTP id A00CE439CD for ; Wed, 11 Nov 2015 22:45:43 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 1E229E00B0 for ; Wed, 11 Nov 2015 22:45:43 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 0E0A83A081C for ; Wed, 11 Nov 2015 22:45:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1713943 - in /qpid/trunk/qpid/python/qpid: messaging/driver.py selector.py Date: Wed, 11 Nov 2015 22:45:42 -0000 To: commits@qpid.apache.org From: kgiusti@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20151111224543.0E0A83A081C@svn01-us-west.apache.org> Author: kgiusti Date: Wed Nov 11 22:45:42 2015 New Revision: 1713943 URL: http://svn.apache.org/viewvc?rev=1713943&view=rev Log: QPID-6839: python-qpid: Log the failure of the Selector thread Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py qpid/trunk/qpid/python/qpid/selector.py Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1713943&r1=1713942&r2=1713943&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/messaging/driver.py (original) +++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Nov 11 22:45:42 2015 @@ -390,18 +390,37 @@ class Driver: @synchronized def reading(self): + """Called by the Selector I/O thread to determine if the driver needs to + wait on the arrival of network data (call self.readable() callback) + """ return self._transport is not None and \ self._transport.reading(True) @synchronized def writing(self): + """Called by the Selector I/O thread to determine if it should block + waiting for output bandwidth (call the self.writeable() callback) + """ return self._transport is not None and \ self._transport.writing(self.engine.pending()) @synchronized def timing(self): + """Called by the Selector I/O thread to determine if it should wake up the + driver (call the timeout() callback + """ return self._timeout + @synchronized + def abort(self, exc, info): + """Called if the Selector I/O thread hits an unrecoverable error and fails. + """ + try: + self.connection.error = exc + log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info)) + except: + pass + def _check_retry_ok(self): """We consider a reconnect to have suceeded only when we have received open-ok from the peer. Modified: qpid/trunk/qpid/python/qpid/selector.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/selector.py?rev=1713943&r1=1713942&r2=1713943&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/selector.py (original) +++ qpid/trunk/qpid/python/qpid/selector.py Wed Nov 11 22:45:42 2015 @@ -17,8 +17,11 @@ # under the License. # import atexit, time, errno, os -from compat import select, SelectError, set, selectable_waiter +from compat import select, SelectError, set, selectable_waiter, format_exc from threading import Thread, Lock +from logging import getLogger + +log = getLogger("qpid.messaging") class Acceptor: @@ -67,6 +70,7 @@ class Selector: self.reading.add(self.waiter) self.stopped = False self.thread = None + self.exception = None def wakeup(self): self.waiter.wakeup() @@ -103,48 +107,58 @@ class Selector: self.thread.start(); def run(self): - while not self.stopped: - wakeup = None - for sel in self.selectables.copy(): - t = self._update(sel) - if t is not None: - if wakeup is None: - wakeup = t - else: - wakeup = min(wakeup, t) - - rd = [] - wr = [] - ex = [] - - while True: - try: - if wakeup is None: - timeout = None - else: - timeout = max(0, wakeup - time.time()) - rd, wr, ex = select(self.reading, self.writing, (), timeout) - break - except SelectError, e: - # Repeat the select call if we were interrupted. - if e[0] == errno.EINTR: - continue - else: - raise - - for sel in wr: - if sel.writing(): - sel.writeable() - - for sel in rd: - if sel.reading(): - sel.readable() - - now = time.time() + try: + while not self.stopped: + wakeup = None + for sel in self.selectables.copy(): + t = self._update(sel) + if t is not None: + if wakeup is None: + wakeup = t + else: + wakeup = min(wakeup, t) + + rd = [] + wr = [] + ex = [] + + while True: + try: + if wakeup is None: + timeout = None + else: + timeout = max(0, wakeup - time.time()) + rd, wr, ex = select(self.reading, self.writing, (), timeout) + break + except SelectError, e: + # Repeat the select call if we were interrupted. + if e[0] == errno.EINTR: + continue + else: + # unrecoverable: promote to outer try block + raise + + for sel in wr: + if sel.writing(): + sel.writeable() + + for sel in rd: + if sel.reading(): + sel.readable() + + now = time.time() + for sel in self.selectables.copy(): + w = sel.timing() + if w is not None and now > w: + sel.timeout() + except Exception, e: + self.exception = e + info = format_exc() + log.error("qpid.messaging I/O thread has died: %s" % str(e)) for sel in self.selectables.copy(): - w = sel.timing() - if w is not None and now > w: - sel.timeout() + if hasattr(sel, "abort"): + sel.abort(e, info) + raise def stop(self, timeout=None): self.stopped = True --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org