beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [2/5] beam git commit: Per-transform runner api dispatch.
Date Thu, 27 Apr 2017 01:14:34 GMT
Per-transform runner api dispatch.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c9b174
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c9b174
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c9b174

Branch: refs/heads/master
Commit: 97c9b174d027e58ac5202cc1eedeaec59b57023a
Parents: 26c61f4
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Tue Apr 18 15:29:04 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Wed Apr 26 18:14:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             | 18 ++++-----
 .../python/apache_beam/transforms/ptransform.py | 40 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 0f4c8db..100c50a 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,7 +52,6 @@ import os
 import shutil
 import tempfile
 
-from google.protobuf import wrappers_pb2
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
@@ -60,8 +59,6 @@ from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
-from apache_beam.utils import proto_utils
-from apache_beam.utils import urns
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 from apache_beam.utils.pipeline_options import StandardOptions
@@ -514,12 +511,15 @@ class AppliedPTransform(object):
 
   def to_runner_api(self, context):
     from apache_beam.runners.api import beam_runner_api_pb2
+
+    def transform_to_runner_api(transform, context):
+      if transform is None:
+        return None
+      else:
+        return transform.to_runner_api(context)
     return beam_runner_api_pb2.PTransform(
         unique_name=self.full_label,
-        spec=beam_runner_api_pb2.FunctionSpec(
-            urn=urns.PICKLED_TRANSFORM,
-            parameter=proto_utils.pack_Any(
-                wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))),
+        spec=transform_to_runner_api(self.transform, context),
         subtransforms=[context.transforms.get_id(part) for part in self.parts],
         # TODO(BEAM-115): Side inputs.
         inputs={tag: context.pcollections.get_id(pc)
@@ -533,9 +533,7 @@ class AppliedPTransform(object):
   def from_runner_api(proto, context):
     result = AppliedPTransform(
         parent=None,
-        transform=pickler.loads(
-            proto_utils.unpack_Any(proto.spec.parameter,
-                                   wrappers_pb2.BytesValue).value),
+        transform=ptransform.PTransform.from_runner_api(proto.spec, context),
         full_label=proto.unique_name,
         inputs=[
             context.pcollections.get_by_id(id) for id in proto.inputs.values()])

http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index e2c4428..706b003 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -42,6 +42,8 @@ import operator
 import os
 import sys
 
+from google.protobuf import wrappers_pb2
+
 from apache_beam import error
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -54,6 +56,8 @@ from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import validate_composite_type_param
 from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import instance_to_type
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
 
 
 class _PValueishTransform(object):
@@ -412,6 +416,42 @@ class PTransform(WithTypeHints, HasDisplayData):
         yield pvalueish
     return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
 
+  _known_urns = {}
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, constructor):
+    cls._known_urns[urn] = parameter_type, constructor
+
+  def to_runner_api(self, context):
+    from apache_beam.runners.api import beam_runner_api_pb2
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.FunctionSpec(
+        urn=urn,
+        parameter=proto_utils.pack_Any(typed_param))
+
+  @classmethod
+  def from_runner_api(cls, proto, context):
+    if proto is None or not proto.urn:
+      return None
+    parameter_type, constructor = cls._known_urns[proto.urn]
+    return constructor(
+        proto_utils.unpack_Any(proto.parameter, parameter_type),
+        context)
+
+  def to_runner_api_parameter(self, context):
+    return (urns.PICKLED_TRANSFORM,
+            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+  @staticmethod
+  def from_runner_api_parameter(spec_parameter, unused_context):
+    return pickler.loads(spec_parameter.value)
+
+
+PTransform.register_urn(
+    urns.PICKLED_TRANSFORM,
+    wrappers_pb2.BytesValue,
+    PTransform.from_runner_api_parameter)
+
 
 class ChainedPTransform(PTransform):
 


Mime
View raw message