beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: Python streaming Create override as a composite of Impulse and a DoFn
Date Mon, 19 Jun 2017 17:22:25 GMT
Repository: beam
Updated Branches:
  refs/heads/master 2ab482d2b -> c12d6ba80


Python streaming Create override as a composite of Impulse and a DoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bcd43964
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bcd43964
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bcd43964

Branch: refs/heads/master
Commit: bcd439640a635e635cc8686a4e4fcaa94800cb37
Parents: 2ab482d
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Mon Jun 12 22:32:39 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Mon Jun 19 10:21:18 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         | 34 +++++++++
 .../runners/dataflow/dataflow_runner_test.py    | 18 +++++
 .../dataflow/native_io/streaming_create.py      | 72 ++++++++++++++++++++
 .../runners/dataflow/ptransform_overrides.py    | 52 ++++++++++++++
 sdks/python/apache_beam/transforms/core.py      | 11 +--
 5 files changed, 183 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bcd43964/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index cc9274e..ce46ea9 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -39,6 +39,7 @@ from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
 from apache_beam.runners.dataflow.internal.names import PropertyNames
 from apache_beam.runners.dataflow.internal.names import TransformNames
+from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride
 from apache_beam.runners.runner import PValueCache
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
@@ -69,6 +70,16 @@ class DataflowRunner(PipelineRunner):
   BATCH_ENVIRONMENT_MAJOR_VERSION = '6'
   STREAMING_ENVIRONMENT_MAJOR_VERSION = '1'
 
+  # A list of PTransformOverride objects to be applied before running a pipeline
+  # using DataflowRunner.
+  # Currently this only works for overrides where the input and output types do
+  # not change.
+  # For internal SDK use only. This should not be updated by Beam pipeline
+  # authors.
+  _PTRANSFORM_OVERRIDES = [
+      CreatePTransformOverride(),
+  ]
+
   def __init__(self, cache=None):
     # Cache of CloudWorkflowStep protos generated while the runner
     # "executes" a pipeline.
@@ -229,6 +240,9 @@ class DataflowRunner(PipelineRunner):
           'Google Cloud Dataflow runner not available, '
           'please install apache_beam[gcp]')
 
+    # Performing configured PTransform overrides.
+    pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
+
     # Add setup_options for all the BeamPlugin imports
     setup_options = pipeline._options.view_as(SetupOptions)
     plugins = BeamPlugin.get_all_plugin_paths()
@@ -370,6 +384,26 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
     return step
 
+  def run_Impulse(self, transform_node):
+    standard_options = (
+        transform_node.outputs[None].pipeline._options.view_as(StandardOptions))
+    if standard_options.streaming:
+      step = self._add_step(
+          TransformNames.READ, transform_node.full_label, transform_node)
+      step.add_property(PropertyNames.FORMAT, 'pubsub')
+      step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/')
+
+      step.encoding = self._get_encoded_output_coder(transform_node)
+      step.add_property(
+          PropertyNames.OUTPUT_INFO,
+          [{PropertyNames.USER_NAME: (
+              '%s.%s' % (
+                  transform_node.full_label, PropertyNames.OUT)),
+            PropertyNames.ENCODING: step.encoding,
+            PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+    else:
+      ValueError('Impulse source for batch pipelines has not been defined.')
+
   def run_Flatten(self, transform_node):
     step = self._add_step(TransformNames.FLATTEN,
                           transform_node.full_label, transform_node)

http://git-wip-us.apache.org/repos/asf/beam/blob/bcd43964/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 74fd01d..819d471 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -111,6 +111,24 @@ class DataflowRunnerTest(unittest.TestCase):
     remote_runner.job = apiclient.Job(p._options)
     super(DataflowRunner, remote_runner).run(p)
 
+  def test_streaming_create_translation(self):
+    remote_runner = DataflowRunner()
+    self.default_properties.append("--streaming")
+    p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+    p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
+    remote_runner.job = apiclient.Job(p._options)
+    # Performing configured PTransform overrides here.
+    p.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
+    super(DataflowRunner, remote_runner).run(p)
+    job_dict = json.loads(str(remote_runner.job))
+    self.assertEqual(len(job_dict[u'steps']), 2)
+
+    self.assertEqual(job_dict[u'steps'][0][u'kind'], u'ParallelRead')
+    self.assertEqual(
+        job_dict[u'steps'][0][u'properties'][u'pubsub_subscription'],
+        '_starting_signal/')
+    self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
+
   def test_remote_runner_display_data(self):
     remote_runner = DataflowRunner()
     p = Pipeline(remote_runner,

http://git-wip-us.apache.org/repos/asf/beam/blob/bcd43964/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
new file mode 100644
index 0000000..8c6c8d6
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Create transform for streaming."""
+
+from apache_beam import pvalue
+from apache_beam import DoFn
+from apache_beam import ParDo
+from apache_beam import PTransform
+from apache_beam import Windowing
+from apache_beam.transforms.window import GlobalWindows
+
+
+class StreamingCreate(PTransform):
+  """A specialized implementation for ``Create`` transform in streaming mode.
+
+  Note: There is no unbounded source API in python to wrap the Create source,
+  so we map this to composite of Impulse primitive and an SDF.
+  """
+
+  def __init__(self, values, coder):
+    self.coder = coder
+    self.encoded_values = map(coder.encode, values)
+
+  class DecodeAndEmitDoFn(DoFn):
+    """A DoFn which stores encoded versions of elements.
+
+    It also stores a Coder to decode and emit those elements.
+    TODO: BEAM-2422 - Make this a SplittableDoFn.
+    """
+
+    def __init__(self, encoded_values, coder):
+      self.encoded_values = encoded_values
+      self.coder = coder
+
+    def process(self, unused_element):
+      for encoded_value in self.encoded_values:
+        yield self.coder.decode(encoded_value)
+
+  class Impulse(PTransform):
+    """The Dataflow specific override for the impulse primitive."""
+
+    def expand(self, pbegin):
+      assert isinstance(pbegin, pvalue.PBegin), (
+          'Input to Impulse transform must be a PBegin but found %s' % pbegin)
+      return pvalue.PCollection(pbegin.pipeline)
+
+    def get_windowing(self, inputs):
+      return Windowing(GlobalWindows())
+
+    def infer_output_type(self, unused_input_type):
+      return bytes
+
+  def expand(self, pbegin):
+    return (pbegin
+            | 'Impulse' >> self.Impulse()
+            | 'Decode Values' >> ParDo(
+                self.DecodeAndEmitDoFn(self.encoded_values, self.coder)))

http://git-wip-us.apache.org/repos/asf/beam/blob/bcd43964/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
new file mode 100644
index 0000000..680a4b7
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Ptransform overrides for DataflowRunner."""
+
+from apache_beam.coders import typecoders
+from apache_beam.pipeline import PTransformOverride
+
+
+class CreatePTransformOverride(PTransformOverride):
+  """A ``PTransformOverride`` for ``Create`` in streaming mode."""
+
+  def get_matcher(self):
+    return self.is_streaming_create
+
+  @staticmethod
+  def is_streaming_create(applied_ptransform):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam import Create
+    from apache_beam.options.pipeline_options import StandardOptions
+
+    if isinstance(applied_ptransform.transform, Create):
+      standard_options = (applied_ptransform
+                          .outputs[None]
+                          .pipeline._options
+                          .view_as(StandardOptions))
+      return standard_options.streaming
+    else:
+      return False
+
+  def get_replacement_transform(self, ptransform):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.runners.dataflow.native_io.streaming_create import \
+      StreamingCreate
+    coder = typecoders.registry.get_coder(ptransform.get_output_type())
+    return StreamingCreate(ptransform.value, coder)

http://git-wip-us.apache.org/repos/asf/beam/blob/bcd43964/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index c30136d..8018219 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1444,15 +1444,18 @@ class Create(PTransform):
       return Any
     return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
 
+  def get_output_type(self):
+    return (self.get_type_hints().simple_output_type(self.label) or
+            self.infer_output_type(None))
+
   def expand(self, pbegin):
     from apache_beam.io import iobase
     assert isinstance(pbegin, pvalue.PBegin)
     self.pipeline = pbegin.pipeline
-    ouput_type = (self.get_type_hints().simple_output_type(self.label) or
-                  self.infer_output_type(None))
-    coder = typecoders.registry.get_coder(ouput_type)
+    coder = typecoders.registry.get_coder(self.get_output_type())
     source = self._create_source_from_iterable(self.value, coder)
-    return pbegin.pipeline | iobase.Read(source).with_output_types(ouput_type)
+    return (pbegin.pipeline
+            | iobase.Read(source).with_output_types(self.get_output_type()))
 
   def get_windowing(self, unused_inputs):
     return Windowing(GlobalWindows())


Mime
View raw message