beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: CP #4112, #4122: Properly handle side input exception when all reader threads complete
Date Tue, 14 Nov 2017 02:14:13 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.2.0 d28ee9b9f -> 8da5c1b61


CP #4112, #4122: Properly handle side input exception when all reader threads complete


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/381e86c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/381e86c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/381e86c6

Branch: refs/heads/release-2.2.0
Commit: 381e86c6d863dcbde3d626abebc1519131d94c13
Parents: d28ee9b
Author: Charles Chen <ccy@google.com>
Authored: Fri Nov 10 11:28:43 2017 -0800
Committer: Charles Chen <ccy@google.com>
Committed: Mon Nov 13 14:03:42 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py    | 21 +++++++++++---------
 .../runners/worker/sideinputs_test.py           | 19 ++++++++++++++++++
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/381e86c6/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index bdf9f4e..6c7831d 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -116,6 +116,7 @@ class PrefetchingSourceSetIterable(object):
       self.element_queue.put(READER_THREAD_IS_DONE_SENTINEL)
 
   def __iter__(self):
+    # pylint: disable=too-many-nested-blocks
     if self.already_iterated:
       raise RuntimeError(
           'Can only iterate once over PrefetchingSourceSetIterable instance.')
@@ -128,15 +129,17 @@ class PrefetchingSourceSetIterable(object):
     num_readers_finished = 0
     try:
       while True:
-        element = self.element_queue.get()
-        if element is READER_THREAD_IS_DONE_SENTINEL:
-          num_readers_finished += 1
-          if num_readers_finished == self.num_reader_threads:
-            return
-        elif self.has_errored:
-          raise self.reader_exceptions.get()
-        else:
-          yield element
+        try:
+          element = self.element_queue.get()
+          if element is READER_THREAD_IS_DONE_SENTINEL:
+            num_readers_finished += 1
+            if num_readers_finished == self.num_reader_threads:
+              return
+          else:
+            yield element
+        finally:
+          if self.has_errored:
+            raise self.reader_exceptions.get()
     except GeneratorExit:
       self.has_errored = True
       raise

http://git-wip-us.apache.org/repos/asf/beam/blob/381e86c6/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index d243bbe..bb688dd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -91,6 +91,24 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
         sources, max_reader_threads=1)
     assert list(strip_windows(iterator_fn())) == range(11)
 
+  def test_source_iterator_single_source_exception(self):
+    class MyException(Exception):
+      pass
+
+    def exception_generator():
+      yield 0
+      raise MyException('I am an exception!')
+
+    sources = [
+        FakeSource(exception_generator()),
+    ]
+    iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)
+    seen = set()
+    with self.assertRaises(MyException):
+      for value in iterator_fn():
+        seen.add(value.value)
+    self.assertEqual(sorted(seen), [0])
+
   def test_source_iterator_fn_exception(self):
     class MyException(Exception):
       pass
@@ -103,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
     def perpetual_generator(value):
       while True:
         yield value
+        time.sleep(0.1)
 
     sources = [
         FakeSource(perpetual_generator(1)),


Mime
View raw message