beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [16/50] [abbrv] beam git commit: Fix flakiness in sideinputs_test
Date Fri, 17 Nov 2017 20:31:07 GMT
Fix flakiness in sideinputs_test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebeb3fd9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebeb3fd9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebeb3fd9

Branch: refs/heads/tez-runner
Commit: ebeb3fd998c1a6a8ce4f9514d54d9e28a6619aa0
Parents: 2df25db
Author: Charles Chen <ccy@google.com>
Authored: Mon Nov 13 11:41:47 2017 -0800
Committer: Charles Chen <ccy@google.com>
Committed: Mon Nov 13 11:56:07 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py    | 22 ++++++++++----------
 .../runners/worker/sideinputs_test.py           |  1 +
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index c91fe95..6c7831d 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -129,17 +129,17 @@ class PrefetchingSourceSetIterable(object):
     num_readers_finished = 0
     try:
       while True:
-        element = self.element_queue.get()
-        if element is READER_THREAD_IS_DONE_SENTINEL:
-          num_readers_finished += 1
-          if num_readers_finished == self.num_reader_threads:
-            if self.has_errored:
-              raise self.reader_exceptions.get()
-            return
-        elif self.has_errored:
-          raise self.reader_exceptions.get()
-        else:
-          yield element
+        try:
+          element = self.element_queue.get()
+          if element is READER_THREAD_IS_DONE_SENTINEL:
+            num_readers_finished += 1
+            if num_readers_finished == self.num_reader_threads:
+              return
+          else:
+            yield element
+        finally:
+          if self.has_errored:
+            raise self.reader_exceptions.get()
     except GeneratorExit:
       self.has_errored = True
       raise

http://git-wip-us.apache.org/repos/asf/beam/blob/ebeb3fd9/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 73d34fb..bb688dd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -121,6 +121,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
     def perpetual_generator(value):
       while True:
         yield value
+        time.sleep(0.1)
 
     sources = [
         FakeSource(perpetual_generator(1)),


Mime
View raw message