beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Enable grpc controller in fn_api_runner
Date Tue, 27 Jun 2017 17:18:19 GMT
Repository: beam
Updated Branches:
  refs/heads/master e93c06485 -> 39074899a


Enable grpc controller in fn_api_runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8dd0077d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8dd0077d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8dd0077d

Branch: refs/heads/master
Commit: 8dd0077d2a58e278b11c7e7eb4b5f182e1400992
Parents: e93c064
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Mon Jun 26 18:47:39 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Tue Jun 27 10:17:49 2017 -0700

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py        | 12 +++++++---
 .../runners/portability/fn_api_runner_test.py   | 23 +++++++++++++++++++-
 .../apache_beam/runners/worker/sdk_worker.py    |  2 +-
 3 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a8e2eb4..c5438ad 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -174,12 +174,17 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       return {tag: pcollection_id(op_ix, out_ix)
               for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))}
 
+    def only_element(iterable):
+      element, = iterable
+      return element
+
     for op_ix, (stage_name, operation) in enumerate(map_task):
       transform_id = uniquify(stage_name)
 
       if isinstance(operation, operation_specs.WorkerInMemoryWrite):
         # Write this data back to the runner.
-        runner_sinks[(transform_id, 'out')] = operation
+        target_name = only_element(get_inputs(operation).keys())
+        runner_sinks[(transform_id, target_name)] = operation
         transform_spec = beam_runner_api_pb2.FunctionSpec(
             urn=sdk_worker.DATA_OUTPUT_URN,
             parameter=proto_utils.pack_Any(data_operation_spec))
@@ -190,7 +195,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                        maptask_executor_runner.InMemorySource)
             and isinstance(operation.source.source.default_output_coder(),
                            WindowedValueCoder)):
-          input_data[(transform_id, 'input')] = self._reencode_elements(
+          target_name = only_element(get_outputs(op_ix).keys())
+          input_data[(transform_id, target_name)] = self._reencode_elements(
               operation.source.source.read(None),
               operation.source.source.default_output_coder())
           transform_spec = beam_runner_api_pb2.FunctionSpec(
@@ -309,7 +315,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             sink_op.output_buffer.append(e)
         return
 
-  def execute_map_tasks(self, ordered_map_tasks, direct=True):
+  def execute_map_tasks(self, ordered_map_tasks, direct=False):
     if direct:
       controller = FnApiRunner.DirectController()
     else:

http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 9159035..163e980 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -21,6 +21,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability import maptask_executor_runner_test
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class FnApiRunnerTest(
@@ -31,9 +33,28 @@ class FnApiRunnerTest(
         runner=fn_api_runner.FnApiRunner())
 
   def test_combine_per_key(self):
-    # TODO(robertwb): Implement PGBKCV operation.
+    # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
     pass
 
+  def test_combine_per_key(self):
+    # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
+    pass
+
+  def test_pardo_side_inputs(self):
+    # TODO(BEAM-1348): Enable once side inputs are supported in fn API.
+    pass
+
+  def test_pardo_unfusable_side_inputs(self):
+    # TODO(BEAM-1348): Enable once side inputs are supported in fn API.
+    pass
+
+  def test_assert_that(self):
+    # TODO: figure out a way for fn_api_runner to parse and raise the
+    # underlying exception.
+    with self.assertRaisesRegexp(RuntimeError, 'BeamAssertException'):
+      with self.create_pipeline() as p:
+        assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+
   # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 6a366eb..e1ddfb7 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -415,7 +415,7 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers):
 def create(factory, transform_id, transform_proto, grpc_port, consumers):
   target = beam_fn_api_pb2.Target(
       primitive_transform_reference=transform_id,
-      name='out')
+      name=only_element(transform_proto.inputs.keys()))
   return DataOutputOperation(
       transform_proto.unique_name,
       transform_proto.unique_name,


Mime
View raw message