Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BF25D200C47 for ; Thu, 30 Mar 2017 17:22:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BDA1A160B8B; Thu, 30 Mar 2017 15:22:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DC9B7160B78 for ; Thu, 30 Mar 2017 17:22:50 +0200 (CEST) Received: (qmail 50514 invoked by uid 500); 30 Mar 2017 15:22:50 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 50505 invoked by uid 99); 30 Mar 2017 15:22:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 15:22:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D54BBE00FF; Thu, 30 Mar 2017 15:22:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robertwb@apache.org To: commits@beam.apache.org Date: Thu, 30 Mar 2017 15:22:49 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Translate pipeline graph to and from Runner API protos. archived-at: Thu, 30 Mar 2017 15:22:52 -0000 Repository: beam Updated Branches: refs/heads/master 22d368b40 -> ffd87553f Translate pipeline graph to and from Runner API protos. There are some caveates: * Specific known transforms, with their payloads, are not yet translated. * Side inputs are not yet supported. All pipelines without side inputs are passed through this translation by default before execution. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bfc21b8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bfc21b8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bfc21b8 Branch: refs/heads/master Commit: 5bfc21b8fcc1eb5b9cae9b02808f15c06a76ca56 Parents: 22d368b Author: Robert Bradshaw Authored: Mon Mar 20 16:08:37 2017 -0700 Committer: Robert Bradshaw Committed: Thu Mar 30 08:22:15 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 3 +- .../examples/snippets/snippets_test.py | 3 + sdks/python/apache_beam/pipeline.py | 107 +++++++++++++++++-- sdks/python/apache_beam/pipeline_test.py | 16 +++ sdks/python/apache_beam/pvalue.py | 37 ++++++- .../runners/dataflow/dataflow_runner.py | 6 +- .../runners/direct/transform_evaluator.py | 23 ++-- sdks/python/apache_beam/runners/runner.py | 2 +- sdks/python/apache_beam/utils/urns.py | 2 + 9 files changed, 165 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 5cb5ee5..85ab864 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -683,7 +683,8 @@ def model_custom_source(count): lines, beam.equal_to( ['line ' + str(number) for number in range(0, count)])) - p.run().wait_until_finish() + # Don't test runner api due to pickling errors. + p.run(test_runner_api=False).wait_until_finish() def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 9635c7a..64f3dfb 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -328,6 +328,9 @@ class TypeHintsTest(unittest.TestCase): lines = (p | beam.Create( ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) + # For pickling + global Player # pylint: disable=global-variable-not-assigned + # [START type_hints_deterministic_key] class Player(object): def __init__(self, team, name): http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index dc05bd3..be2a79d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -52,6 +52,7 @@ import os import shutil import tempfile +from google.protobuf import wrappers_pb2 from apache_beam import pvalue from apache_beam import typehints from apache_beam.internal import pickler @@ -59,6 +60,8 @@ from apache_beam.runners import create_runner from apache_beam.runners import PipelineRunner from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError +from apache_beam.utils import proto_utils +from apache_beam.utils import urns from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions from apache_beam.utils.pipeline_options import StandardOptions @@ -151,8 +154,14 @@ class Pipeline(object): """Returns the root transform of the transform stack.""" return self.transforms_stack[0] - def run(self): + def run(self, test_runner_api=True): """Runs the pipeline. Returns whatever our runner returns after running.""" + + # When possible, invoke a round trip through the runner API. + if test_runner_api and self._verify_runner_api_compatible(): + return Pipeline.from_runner_api( + self.to_runner_api(), self.runner, self.options).run(False) + if self.options.view_as(SetupOptions).save_main_session: # If this option is chosen, verify we can pickle the main session early. tmpdir = tempfile.mkdtemp() @@ -299,6 +308,42 @@ class Pipeline(object): self.transforms_stack.pop() return pvalueish_result + def _verify_runner_api_compatible(self): + class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment + ok = True # Really a nonlocal. + + def visit_transform(self, transform_node): + if transform_node.side_inputs: + Visitor.ok = False + self.visit(Visitor()) + return Visitor.ok + + def to_runner_api(self): + from apache_beam.runners import pipeline_context + from apache_beam.runners.api import beam_runner_api_pb2 + context = pipeline_context.PipelineContext() + # Mutates context; placing inline would force dependence on + # argument evaluation order. + root_transform_id = context.transforms.get_id(self._root_transform()) + proto = beam_runner_api_pb2.Pipeline( + root_transform_id=root_transform_id, + components=context.to_runner_api()) + return proto + + @staticmethod + def from_runner_api(proto, runner, options): + p = Pipeline(runner=runner, options=options) + from apache_beam.runners import pipeline_context + context = pipeline_context.PipelineContext(proto.components) + p.transforms_stack = [ + context.transforms.get_by_id(proto.root_transform_id)] + # TODO(robertwb): These are only needed to continue construction. Omit? + p.applied_labels = set([ + t.unique_name for t in proto.components.transforms.values()]) + for id in proto.components.pcollections: + context.pcollections.get_by_id(id).pipeline = p + return p + class PipelineVisitor(object): """Visitor pattern class used to traverse a DAG of transforms. @@ -374,12 +419,16 @@ class AppliedPTransform(object): real_producer(side_input).refcounts[side_input.tag] += 1 def add_output(self, output, tag=None): - assert (isinstance(output, pvalue.PValue) or - isinstance(output, pvalue.DoOutputsTuple)) - if tag is None: - tag = len(self.outputs) - assert tag not in self.outputs - self.outputs[tag] = output + if isinstance(output, pvalue.DoOutputsTuple): + self.add_output(output[output._main_tag]) + elif isinstance(output, pvalue.PValue): + # TODO(BEAM-1833): Require tags when calling this method. + if tag is None and None in self.outputs: + tag = len(self.outputs) + assert tag not in self.outputs + self.outputs[tag] = output + else: + raise TypeError("Unexpected output type: %s" % output) def add_part(self, part): assert isinstance(part, AppliedPTransform) @@ -441,3 +490,47 @@ class AppliedPTransform(object): if v not in visited: visited.add(v) visitor.visit_value(v, self) + + def named_inputs(self): + # TODO(BEAM-1833): Push names up into the sdk construction. + return {str(ix): input for ix, input in enumerate(self.inputs) + if isinstance(input, pvalue.PCollection)} + + def to_runner_api(self, context): + from apache_beam.runners.api import beam_runner_api_pb2 + return beam_runner_api_pb2.PTransform( + unique_name=self.full_label, + spec=beam_runner_api_pb2.UrnWithParameter( + urn=urns.PICKLED_TRANSFORM, + parameter=proto_utils.pack_Any( + wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))), + subtransforms=[context.transforms.get_id(part) for part in self.parts], + # TODO(BEAM-115): Side inputs. + inputs={tag: context.pcollections.get_id(pc) + for tag, pc in self.named_inputs().items()}, + outputs={str(tag): context.pcollections.get_id(out) + for tag, out in self.outputs.items()}, + # TODO(BEAM-115): display_data + display_data=None) + + @staticmethod + def from_runner_api(proto, context): + result = AppliedPTransform( + parent=None, + transform=pickler.loads( + proto_utils.unpack_Any(proto.spec.parameter, + wrappers_pb2.BytesValue).value), + full_label=proto.unique_name, + inputs=[ + context.pcollections.get_by_id(id) for id in proto.inputs.values()]) + result.parts = [ + context.transforms.get_by_id(id) for id in proto.subtransforms] + result.outputs = { + None if tag == 'None' else tag: context.pcollections.get_by_id(id) + for tag, id in proto.outputs.items()} + if not result.parts: + for tag, pc in result.outputs.items(): + if pc not in result.inputs: + pc.producer = result + pc.tag = tag + return result http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index d80a4e8..d464fdb 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -24,6 +24,7 @@ import unittest # TODO(BEAM-1555): Test is failing on the service, with FakeSource. # from nose.plugins.attrib import attr +import apache_beam as beam from apache_beam.metrics import Metrics from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions @@ -439,6 +440,21 @@ class PipelineOptionsTest(unittest.TestCase): if not attr.startswith('_')])) +class RunnerApiTest(unittest.TestCase): + + def test_simple(self): + """Tests serializing, deserializing, and running a simple pipeline. + + More extensive tests are done at pipeline.run for each suitable test. + """ + p = beam.Pipeline() + p | beam.Create([None]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned + proto = p.to_runner_api() + + p2 = Pipeline.from_runner_api(proto, p.runner, p.options) + p2.run() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 6dc67b0..4114b3f 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -99,6 +99,13 @@ class PCollection(PValue): """Initializes a PCollection. Do not call directly.""" super(PCollection, self).__init__(pipeline, **kwargs) + def __eq__(self, other): + if isinstance(other, PCollection): + return self.tag == other.tag and self.producer == other.producer + + def __hash__(self): + return hash((self.tag, self.producer)) + @property def windowing(self): if not hasattr(self, '_windowing'): @@ -112,6 +119,24 @@ class PCollection(PValue): # of a closure). return _InvalidUnpickledPCollection, () + def to_runner_api(self, context): + from apache_beam.runners.api import beam_runner_api_pb2 + from apache_beam.internal import pickler + return beam_runner_api_pb2.PCollection( + unique_name='%d%s.%s' % ( + len(self.producer.full_label), self.producer.full_label, self.tag), + coder_id=pickler.dumps(self.element_type), + is_bounded=beam_runner_api_pb2.BOUNDED, + windowing_strategy_id=context.windowing_strategies.get_id( + self.windowing)) + + @staticmethod + def from_runner_api(proto, context): + from apache_beam.internal import pickler + # Producer and tag will be filled in later, the key point is that the + # same object is returned for the same pcollection id. + return PCollection(None, element_type=pickler.loads(proto.coder_id)) + class _InvalidUnpickledPCollection(object): pass @@ -183,7 +208,8 @@ class DoOutputsTuple(object): tag = None elif self._tags and tag not in self._tags: raise ValueError( - 'Tag %s is neither the main tag %s nor any of the side tags %s' % ( + "Tag '%s' is neither the main tag '%s' " + "nor any of the side tags %s" % ( tag, self._main_tag, self._tags)) # Check if we accessed this tag before. if tag in self._pcolls: @@ -194,14 +220,15 @@ class DoOutputsTuple(object): pcoll = PCollection(self._pipeline, tag=tag) # Transfer the producer from the DoOutputsTuple to the resulting # PCollection. - pcoll.producer = self.producer + pcoll.producer = self.producer.parts[0] # Add this as an output to both the inner ParDo and the outer _MultiParDo # PTransforms. - self.producer.parts[0].add_output(pcoll, tag) - self.producer.add_output(pcoll, tag) + if tag not in self.producer.parts[0].outputs: + self.producer.parts[0].add_output(pcoll, tag) + self.producer.add_output(pcoll, tag) else: # Main output is output of inner ParDo. - pcoll = self.producer.parts[0].outputs[0] + pcoll = self.producer.parts[0].outputs[None] self._pcolls[tag] = pcoll return pcoll http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index bd29d63..a82671c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -219,9 +219,9 @@ class DataflowRunner(PipelineRunner): def _get_encoded_output_coder(self, transform_node, window_value=True): """Returns the cloud encoding of the coder for the output of a transform.""" if (len(transform_node.outputs) == 1 - and transform_node.outputs[0].element_type is not None): + and transform_node.outputs[None].element_type is not None): # TODO(robertwb): Handle type hints for multi-output transforms. - element_type = transform_node.outputs[0].element_type + element_type = transform_node.outputs[None].element_type else: # TODO(silviuc): Remove this branch (and assert) when typehints are # propagated everywhere. Returning an 'Any' as type hint will trigger @@ -229,7 +229,7 @@ class DataflowRunner(PipelineRunner): element_type = typehints.Any if window_value: window_coder = ( - transform_node.outputs[0].windowing.windowfn.get_window_coder()) + transform_node.outputs[None].windowing.windowfn.get_window_coder()) else: window_coder = None return self._get_typehint_based_encoding( http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 6ae5697..f9a0692 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -323,23 +323,12 @@ class _ParDoEvaluator(_TransformEvaluator): transform = self._applied_ptransform.transform self._tagged_receivers = _TaggedReceivers(self._evaluation_context) - if isinstance(self._applied_ptransform.parent.transform, core._MultiParDo): # pylint: disable=protected-access - do_outputs_tuple = self._applied_ptransform.parent.outputs[0] - assert isinstance(do_outputs_tuple, pvalue.DoOutputsTuple) - main_output_pcollection = do_outputs_tuple[do_outputs_tuple._main_tag] # pylint: disable=protected-access - - for side_output_tag in transform.side_output_tags: - output_pcollection = do_outputs_tuple[side_output_tag] - self._tagged_receivers[side_output_tag] = ( - self._evaluation_context.create_bundle(output_pcollection)) - self._tagged_receivers[side_output_tag].tag = side_output_tag - else: - assert len(self._outputs) == 1 - main_output_pcollection = list(self._outputs)[0] - - self._tagged_receivers[None] = self._evaluation_context.create_bundle( - main_output_pcollection) - self._tagged_receivers[None].tag = None # main_tag is None. + for side_output_tag in self._applied_ptransform.outputs: + output_pcollection = pvalue.PCollection(None, tag=side_output_tag) + output_pcollection.producer = self._applied_ptransform + self._tagged_receivers[side_output_tag] = ( + self._evaluation_context.create_bundle(output_pcollection)) + self._tagged_receivers[side_output_tag].tag = side_output_tag self._counter_factory = counters.CounterFactory() http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index bae21c1..b203c8b 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -222,7 +222,7 @@ class PValueCache(object): return len(self._cache) def to_cache_key(self, transform, tag): - return str((id(transform), tag)) + return transform.full_label, tag def _ensure_pvalue_has_real_producer(self, pvalue): """Ensure the passed-in PValue has the real_producer attribute. http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 936e2cb..88fca09 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -22,3 +22,5 @@ SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1" SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1" PICKLED_CODER = "beam:coder:pickled_python:v0.1" + +PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"