Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D9CC7200D4A for ; Tue, 14 Nov 2017 03:14:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D69BA160C07; Tue, 14 Nov 2017 02:14:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 289F2160BF3 for ; Tue, 14 Nov 2017 03:14:14 +0100 (CET) Received: (qmail 69199 invoked by uid 500); 14 Nov 2017 02:14:13 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 69190 invoked by uid 99); 14 Nov 2017 02:14:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2017 02:14:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1EAE2DFD78; Tue, 14 Nov 2017 02:14:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chamikara@apache.org To: commits@beam.apache.org Date: Tue, 14 Nov 2017 02:14:13 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: CP #4112, #4122: Properly handle side input exception when all reader threads complete archived-at: Tue, 14 Nov 2017 02:14:15 -0000 Repository: beam Updated Branches: refs/heads/release-2.2.0 d28ee9b9f -> 8da5c1b61 CP #4112, #4122: Properly handle side input exception when all reader threads complete Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/381e86c6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/381e86c6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/381e86c6 Branch: refs/heads/release-2.2.0 Commit: 381e86c6d863dcbde3d626abebc1519131d94c13 Parents: d28ee9b Author: Charles Chen Authored: Fri Nov 10 11:28:43 2017 -0800 Committer: Charles Chen Committed: Mon Nov 13 14:03:42 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/runners/worker/sideinputs.py | 21 +++++++++++--------- .../runners/worker/sideinputs_test.py | 19 ++++++++++++++++++ 2 files changed, 31 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/381e86c6/sdks/python/apache_beam/runners/worker/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index bdf9f4e..6c7831d 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -116,6 +116,7 @@ class PrefetchingSourceSetIterable(object): self.element_queue.put(READER_THREAD_IS_DONE_SENTINEL) def __iter__(self): + # pylint: disable=too-many-nested-blocks if self.already_iterated: raise RuntimeError( 'Can only iterate once over PrefetchingSourceSetIterable instance.') @@ -128,15 +129,17 @@ class PrefetchingSourceSetIterable(object): num_readers_finished = 0 try: while True: - element = self.element_queue.get() - if element is READER_THREAD_IS_DONE_SENTINEL: - num_readers_finished += 1 - if num_readers_finished == self.num_reader_threads: - return - elif self.has_errored: - raise self.reader_exceptions.get() - else: - yield element + try: + element = self.element_queue.get() + if element is READER_THREAD_IS_DONE_SENTINEL: + num_readers_finished += 1 + if num_readers_finished == self.num_reader_threads: + return + else: + yield element + finally: + if self.has_errored: + raise self.reader_exceptions.get() except GeneratorExit: self.has_errored = True raise http://git-wip-us.apache.org/repos/asf/beam/blob/381e86c6/sdks/python/apache_beam/runners/worker/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index d243bbe..bb688dd 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -91,6 +91,24 @@ class PrefetchingSourceIteratorTest(unittest.TestCase): sources, max_reader_threads=1) assert list(strip_windows(iterator_fn())) == range(11) + def test_source_iterator_single_source_exception(self): + class MyException(Exception): + pass + + def exception_generator(): + yield 0 + raise MyException('I am an exception!') + + sources = [ + FakeSource(exception_generator()), + ] + iterator_fn = sideinputs.get_iterator_fn_for_sources(sources) + seen = set() + with self.assertRaises(MyException): + for value in iterator_fn(): + seen.add(value.value) + self.assertEqual(sorted(seen), [0]) + def test_source_iterator_fn_exception(self): class MyException(Exception): pass @@ -103,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase): def perpetual_generator(value): while True: yield value + time.sleep(0.1) sources = [ FakeSource(perpetual_generator(1)),