beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: Avoid flakiness in data channel for empty streams.
Date Thu, 08 Jun 2017 17:09:22 GMT
Repository: beam
Updated Branches:
  refs/heads/master e066a9d6d -> d81ed2172


Avoid flakiness in data channel for empty streams.

As empty stream is used as end-of-stream marker, don't ever send
it as the data itself.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ebebfdb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ebebfdb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ebebfdb

Branch: refs/heads/master
Commit: 4ebebfdb34de3e209c033de15e32cf67ab346d44
Parents: e066a9d
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Wed Jun 7 23:00:43 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu Jun 8 10:06:17 2017 -0700

----------------------------------------------------------------------
 .../python/apache_beam/runners/worker/data_plane.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4ebebfdb/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 5edd0b4..7365db6 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -167,12 +167,18 @@ class _GrpcDataChannel(DataChannel):
         yield data
 
   def output_stream(self, instruction_id, target):
+    # TODO: Return an output stream that sends data
+    # to the Runner once a fixed size buffer is full.
+    # Currently we buffer all the data before sending
+    # any messages.
     def add_to_send_queue(data):
-      self._to_send.put(
-          beam_fn_api_pb2.Elements.Data(
-              instruction_reference=instruction_id,
-              target=target,
-              data=data))
+      if data:
+        self._to_send.put(
+            beam_fn_api_pb2.Elements.Data(
+                instruction_reference=instruction_id,
+                target=target,
+                data=data))
+      # End of stream marker.
       self._to_send.put(
           beam_fn_api_pb2.Elements.Data(
               instruction_reference=instruction_id,


Mime
View raw message