Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 39E6E200B4D for ; Wed, 15 Jun 2016 01:19:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3876C160A06; Tue, 14 Jun 2016 23:19:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9DE5E160A64 for ; Wed, 15 Jun 2016 01:19:29 +0200 (CEST) Received: (qmail 97878 invoked by uid 500); 14 Jun 2016 23:19:28 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 97869 invoked by uid 99); 14 Jun 2016 23:19:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 23:19:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7877D186D91 for ; Tue, 14 Jun 2016 23:12:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 8x5GlRVanZAJ for ; Tue, 14 Jun 2016 23:12:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B000C60D29 for ; Tue, 14 Jun 2016 23:12:38 +0000 (UTC) Received: (qmail 70045 invoked by uid 99); 14 Jun 2016 23:12:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 23:12:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 38161E968B; Tue, 14 Jun 2016 23:12:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Tue, 14 Jun 2016 23:12:45 -0000 Message-Id: <42e1cb597f4c4088b0b8f260c50f918a@git.apache.org> In-Reply-To: <95df9c9428334e3980c0c77c4ddc9382@git.apache.org> References: <95df9c9428334e3980c0c77c4ddc9382@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder archived-at: Tue, 14 Jun 2016 23:19:31 -0000 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/pipeline.py b/sdks/python/google/cloud/dataflow/pipeline.py deleted file mode 100644 index ec87f46..0000000 --- a/sdks/python/google/cloud/dataflow/pipeline.py +++ /dev/null @@ -1,435 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Pipeline, the top-level Dataflow object. - -A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG -are transforms (PTransform objects) and the edges are values (mostly PCollection -objects). The transforms take as inputs one or more PValues and output one or -more PValues. - -The pipeline offers functionality to traverse the graph. The actual operation -to be executed for each node visited is specified through a runner object. - -Typical usage: - - # Create a pipeline object using a local runner for execution. - pipeline = Pipeline(runner=DirectPipelineRunner()) - - # Add to the pipeline a "Create" transform. When executed this - # transform will produce a PCollection object with the specified values. - pcoll = pipeline.create('label', [1, 2, 3]) - - # run() will execute the DAG stored in the pipeline. The execution of the - # nodes visited is done using the specified local runner. - pipeline.run() - -""" - -from __future__ import absolute_import - -import collections -import logging -import os -import shutil -import tempfile - -from google.cloud.dataflow import pvalue -from google.cloud.dataflow import typehints -from google.cloud.dataflow.internal import pickler -from google.cloud.dataflow.runners import create_runner -from google.cloud.dataflow.runners import PipelineRunner -from google.cloud.dataflow.transforms import format_full_label -from google.cloud.dataflow.transforms import ptransform -from google.cloud.dataflow.typehints import TypeCheckError -from google.cloud.dataflow.utils.options import PipelineOptions -from google.cloud.dataflow.utils.options import SetupOptions -from google.cloud.dataflow.utils.options import StandardOptions -from google.cloud.dataflow.utils.options import TypeOptions -from google.cloud.dataflow.utils.pipeline_options_validator import PipelineOptionsValidator - - -class Pipeline(object): - """A pipeline object that manages a DAG of PValues and their PTransforms. - - Conceptually the PValues are the DAG's nodes and the PTransforms computing - the PValues are the edges. - - All the transforms applied to the pipeline must have distinct full labels. - If same transform instance needs to be applied then a clone should be created - with a new label (e.g., transform.clone('new label')). - """ - - def __init__(self, runner=None, options=None, argv=None): - """Initialize a pipeline object. - - Args: - runner: An object of type 'PipelineRunner' that will be used to execute - the pipeline. For registered runners, the runner name can be specified, - otherwise a runner object must be supplied. - options: A configured 'PipelineOptions' object containing arguments - that should be used for running the Dataflow job. - argv: a list of arguments (such as sys.argv) to be used for building a - 'PipelineOptions' object. This will only be used if argument 'options' - is None. - - Raises: - ValueError: if either the runner or options argument is not of the - expected type. - """ - - if options is not None: - if isinstance(options, PipelineOptions): - self.options = options - else: - raise ValueError( - 'Parameter options, if specified, must be of type PipelineOptions. ' - 'Received : %r', options) - elif argv is not None: - if isinstance(argv, list): - self.options = PipelineOptions(argv) - else: - raise ValueError( - 'Parameter argv, if specified, must be a list. Received : %r', argv) - else: - self.options = None - - if runner is None and self.options is not None: - runner = self.options.view_as(StandardOptions).runner - if runner is None: - runner = StandardOptions.DEFAULT_RUNNER - logging.info(('Missing pipeline option (runner). Executing pipeline ' - 'using the default runner: %s.'), runner) - - if isinstance(runner, str): - runner = create_runner(runner) - elif not isinstance(runner, PipelineRunner): - raise TypeError('Runner must be a PipelineRunner object or the ' - 'name of a registered runner.') - - # Validate pipeline options - if self.options is not None: - errors = PipelineOptionsValidator(self.options, runner).validate() - if errors: - raise ValueError( - 'Pipeline has validations errors: \n' + '\n'.join(errors)) - - # Default runner to be used. - self.runner = runner - # Stack of transforms generated by nested apply() calls. The stack will - # contain a root node as an enclosing (parent) node for top transforms. - self.transforms_stack = [AppliedPTransform(None, None, '', None)] - # Set of transform labels (full labels) applied to the pipeline. - # If a transform is applied and the full label is already in the set - # then the transform will have to be cloned with a new label. - self.applied_labels = set() - # Store cache of views created from PCollections. For reference, see - # pvalue._cache_view(). - self._view_cache = {} - - def _current_transform(self): - """Returns the transform currently on the top of the stack.""" - return self.transforms_stack[-1] - - def _root_transform(self): - """Returns the root transform of the transform stack.""" - return self.transforms_stack[0] - - def run(self): - """Runs the pipeline. Returns whatever our runner returns after running.""" - if not self.options or self.options.view_as(SetupOptions).save_main_session: - # If this option is chosen, verify we can pickle the main session early. - tmpdir = tempfile.mkdtemp() - try: - pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle')) - finally: - shutil.rmtree(tmpdir) - return self.runner.run(self) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if not exc_type: - self.run() - - def visit(self, visitor): - """Visits depth-first every node of a pipeline's DAG. - - Args: - visitor: PipelineVisitor object whose callbacks will be called for each - node visited. See PipelineVisitor comments. - - Raises: - TypeError: if node is specified and is not a PValue. - pipeline.PipelineError: if node is specified and does not belong to this - pipeline instance. - """ - - visited = set() - self._root_transform().visit(visitor, self, visited) - - def apply(self, transform, pvalueish=None): - """Applies a custom transform using the pvalueish specified. - - Args: - transform: the PTranform (or callable) to apply. - pvalueish: the input for the PTransform (typically a PCollection). - - Raises: - TypeError: if the transform object extracted from the argument list is - not a callable type or a descendant from PTransform. - RuntimeError: if the transform object was already applied to this pipeline - and needs to be cloned in order to apply again. - """ - if not isinstance(transform, ptransform.PTransform): - transform = _CallableWrapperPTransform(transform) - - full_label = format_full_label(self._current_transform(), transform) - if full_label in self.applied_labels: - raise RuntimeError( - 'Transform "%s" does not have a stable unique label. ' - 'This will prevent updating of pipelines. ' - 'To clone a transform with a new label use: ' - 'transform.clone("NEW LABEL").' - % full_label) - self.applied_labels.add(full_label) - - pvalueish, inputs = transform._extract_input_pvalues(pvalueish) - try: - inputs = tuple(inputs) - for leaf_input in inputs: - if not isinstance(leaf_input, pvalue.PValue): - raise TypeError - except TypeError: - raise NotImplementedError( - 'Unable to extract PValue inputs from %s; either %s does not accept ' - 'inputs of this format, or it does not properly override ' - '_extract_input_values' % (pvalueish, transform)) - - current = AppliedPTransform( - self._current_transform(), transform, full_label, inputs) - self._current_transform().add_part(current) - self.transforms_stack.append(current) - - if self.options is not None: - type_options = self.options.view_as(TypeOptions) - else: - type_options = None - - if type_options is not None and type_options.pipeline_type_check: - transform.type_check_inputs(pvalueish) - - pvalueish_result = self.runner.apply(transform, pvalueish) - - if type_options is not None and type_options.pipeline_type_check: - transform.type_check_outputs(pvalueish_result) - - for result in ptransform.GetPValues().visit(pvalueish_result): - assert isinstance(result, (pvalue.PValue, pvalue.DoOutputsTuple)) - - # Make sure we set the producer only for a leaf node in the transform DAG. - # This way we preserve the last transform of a composite transform as - # being the real producer of the result. - if result.producer is None: - result.producer = current - # TODO(robertwb): Multi-input, multi-output inference. - # TODO(robertwb): Ideally we'd do intersection here. - if (type_options is not None and type_options.pipeline_type_check and - isinstance(result, (pvalue.PCollection, pvalue.PCollectionView)) - and not result.element_type): - input_element_type = ( - inputs[0].element_type - if len(inputs) == 1 - else typehints.Any) - type_hints = transform.get_type_hints() - declared_output_type = type_hints.simple_output_type(transform.label) - if declared_output_type: - input_types = type_hints.input_types - if input_types and input_types[0]: - declared_input_type = input_types[0][0] - result.element_type = typehints.bind_type_variables( - declared_output_type, - typehints.match_type_variables(declared_input_type, - input_element_type)) - else: - result.element_type = declared_output_type - else: - result.element_type = transform.infer_output_type(input_element_type) - - assert isinstance(result.producer.inputs, tuple) - current.add_output(result) - - if (type_options is not None and - type_options.type_check_strictness == 'ALL_REQUIRED' and - transform.get_type_hints().output_types is None): - ptransform_name = '%s(%s)' % (transform.__class__.__name__, full_label) - raise TypeCheckError('Pipeline type checking is enabled, however no ' - 'output type-hint was found for the ' - 'PTransform %s' % ptransform_name) - - current.update_input_refcounts() - self.transforms_stack.pop() - return pvalueish_result - - -class _CallableWrapperPTransform(ptransform.PTransform): - - def __init__(self, callee): - assert callable(callee) - super(_CallableWrapperPTransform, self).__init__( - label=getattr(callee, '__name__', 'Callable')) - self._callee = callee - - def apply(self, *args, **kwargs): - return self._callee(*args, **kwargs) - - -class PipelineVisitor(object): - """Visitor pattern class used to traverse a DAG of transforms. - - This is an internal class used for bookkeeping by a Pipeline. - """ - - def visit_value(self, value, producer_node): - """Callback for visiting a PValue in the pipeline DAG. - - Args: - value: PValue visited (typically a PCollection instance). - producer_node: AppliedPTransform object whose transform produced the - pvalue. - """ - pass - - def visit_transform(self, transform_node): - """Callback for visiting a transform node in the pipeline DAG.""" - pass - - def enter_composite_transform(self, transform_node): - """Callback for entering traversal of a composite transform node.""" - pass - - def leave_composite_transform(self, transform_node): - """Callback for leaving traversal of a composite transform node.""" - pass - - -class AppliedPTransform(object): - """A transform node representing an instance of applying a PTransform. - - This is an internal class used for bookkeeping by a Pipeline. - """ - - def __init__(self, parent, transform, full_label, inputs): - self.parent = parent - self.transform = transform - # Note that we want the PipelineVisitor classes to use the full_label, - # inputs, side_inputs, and outputs fields from this instance instead of the - # ones of the PTransform instance associated with it. Doing this permits - # reusing PTransform instances in different contexts (apply() calls) without - # any interference. This is particularly useful for composite transforms. - self.full_label = full_label - self.inputs = inputs or () - self.side_inputs = () if transform is None else tuple(transform.side_inputs) - self.outputs = {} - self.parts = [] - - # Per tag refcount dictionary for PValues for which this node is a - # root producer. - self.refcounts = collections.defaultdict(int) - - def update_input_refcounts(self): - """Increment refcounts for all transforms providing inputs.""" - - def real_producer(pv): - real = pv.producer - while real.parts: - real = real.parts[-1] - return real - - if not self.is_composite(): - for main_input in self.inputs: - if not isinstance(main_input, pvalue.PBegin): - real_producer(main_input).refcounts[main_input.tag] += 1 - for side_input in self.side_inputs: - real_producer(side_input).refcounts[side_input.tag] += 1 - - def add_output(self, output, tag=None): - assert (isinstance(output, pvalue.PValue) or - isinstance(output, pvalue.DoOutputsTuple)) - if tag is None: - tag = len(self.outputs) - assert tag not in self.outputs - self.outputs[tag] = output - - def add_part(self, part): - assert isinstance(part, AppliedPTransform) - self.parts.append(part) - - def is_composite(self): - """Returns whether this is a composite transform. - - A composite transform has parts (inner transforms) or isn't the - producer for any of its outputs. (An example of a transform that - is not a producer is one that returns its inputs instead.) - """ - return bool(self.parts) or all( - pval.producer is not self for pval in self.outputs.values()) - - def visit(self, visitor, pipeline, visited): - """Visits all nodes reachable from the current node.""" - - for pval in self.inputs: - if pval not in visited and not isinstance(pval, pvalue.PBegin): - assert pval.producer is not None - pval.producer.visit(visitor, pipeline, visited) - # The value should be visited now since we visit outputs too. - assert pval in visited, pval - - # Visit side inputs. - for pval in self.side_inputs: - if isinstance(pval, pvalue.PCollectionView) and pval not in visited: - assert pval.producer is not None - pval.producer.visit(visitor, pipeline, visited) - # The value should be visited now since we visit outputs too. - assert pval in visited - # TODO(silviuc): Is there a way to signal that we are visiting a side - # value? The issue is that the same PValue can be reachable through - # multiple paths and therefore it is not guaranteed that the value - # will be visited as a side value. - - # Visit a composite or primitive transform. - if self.is_composite(): - visitor.enter_composite_transform(self) - for part in self.parts: - part.visit(visitor, pipeline, visited) - visitor.leave_composite_transform(self) - else: - visitor.visit_transform(self) - - # Visit the outputs (one or more). It is essential to mark as visited the - # tagged PCollections of the DoOutputsTuple object. A tagged PCollection is - # connected directly with its producer (a multi-output ParDo), but the - # output of such a transform is the containing DoOutputsTuple, not the - # PCollection inside it. Without the code below a tagged PCollection will - # not be marked as visited while visiting its producer. - for pval in self.outputs.values(): - if isinstance(pval, pvalue.DoOutputsTuple): - pvals = (v for v in pval) - else: - pvals = (pval,) - for v in pvals: - if v not in visited: - visited.add(v) - visitor.visit_value(v, self) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/pipeline_test.py b/sdks/python/google/cloud/dataflow/pipeline_test.py deleted file mode 100644 index ce3bd6d..0000000 --- a/sdks/python/google/cloud/dataflow/pipeline_test.py +++ /dev/null @@ -1,345 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for the Pipeline class.""" - -import gc -import logging -import unittest - -from google.cloud.dataflow.io.iobase import NativeSource -from google.cloud.dataflow.pipeline import Pipeline -from google.cloud.dataflow.pipeline import PipelineOptions -from google.cloud.dataflow.pipeline import PipelineVisitor -from google.cloud.dataflow.pvalue import AsIter -from google.cloud.dataflow.pvalue import SideOutputValue -from google.cloud.dataflow.transforms import CombinePerKey -from google.cloud.dataflow.transforms import Create -from google.cloud.dataflow.transforms import FlatMap -from google.cloud.dataflow.transforms import Flatten -from google.cloud.dataflow.transforms import Map -from google.cloud.dataflow.transforms import PTransform -from google.cloud.dataflow.transforms import Read -from google.cloud.dataflow.transforms.util import assert_that, equal_to - - -class FakeSource(NativeSource): - """Fake source returning a fixed list of values.""" - - class _Reader(object): - - def __init__(self, vals): - self._vals = vals - - def __enter__(self): - return self - - def __exit__(self, exception_type, exception_value, traceback): - pass - - def __iter__(self): - for v in self._vals: - yield v - - def __init__(self, vals): - self._vals = vals - - def reader(self): - return FakeSource._Reader(self._vals) - - -class PipelineTest(unittest.TestCase): - - def setUp(self): - self.runner_name = 'DirectPipelineRunner' - - @staticmethod - def custom_callable(pcoll): - return pcoll | FlatMap('+1', lambda x: [x + 1]) - - # Some of these tests designate a runner by name, others supply a runner. - # This variation is just to verify that both means of runner specification - # work and is not related to other aspects of the tests. - - class CustomTransform(PTransform): - - def apply(self, pcoll): - return pcoll | FlatMap('+1', lambda x: [x + 1]) - - class Visitor(PipelineVisitor): - - def __init__(self, visited): - self.visited = visited - self.enter_composite = [] - self.leave_composite = [] - - def visit_value(self, value, _): - self.visited.append(value) - - def enter_composite_transform(self, transform_node): - self.enter_composite.append(transform_node) - - def leave_composite_transform(self, transform_node): - self.leave_composite.append(transform_node) - - def test_create(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('label1', [1, 2, 3]) - assert_that(pcoll, equal_to([1, 2, 3])) - - # Test if initial value is an iterator object. - pcoll2 = pipeline | Create('label2', iter((4, 5, 6))) - pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10]) - assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') - pipeline.run() - - def test_create_singleton_pcollection(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('label', [[1, 2, 3]]) - assert_that(pcoll, equal_to([[1, 2, 3]])) - pipeline.run() - - def test_read(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Read('read', FakeSource([1, 2, 3])) - assert_that(pcoll, equal_to([1, 2, 3])) - pipeline.run() - - def test_visit_entire_graph(self): - pipeline = Pipeline(self.runner_name) - pcoll1 = pipeline | Create('pcoll', [1, 2, 3]) - pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1]) - pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1]) - pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1]) - transform = PipelineTest.CustomTransform() - pcoll5 = pcoll4 | transform - - visitor = PipelineTest.Visitor(visited=[]) - pipeline.visit(visitor) - self.assertEqual(set([pcoll1, pcoll2, pcoll3, pcoll4, pcoll5]), - set(visitor.visited)) - self.assertEqual(set(visitor.enter_composite), - set(visitor.leave_composite)) - self.assertEqual(2, len(visitor.enter_composite)) - self.assertEqual(visitor.enter_composite[1].transform, transform) - self.assertEqual(visitor.leave_composite[0].transform, transform) - - def test_apply_custom_transform(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('pcoll', [1, 2, 3]) - result = pcoll | PipelineTest.CustomTransform() - assert_that(result, equal_to([2, 3, 4])) - pipeline.run() - - def test_reuse_custom_transform_instance(self): - pipeline = Pipeline(self.runner_name) - pcoll1 = pipeline | Create('pcoll1', [1, 2, 3]) - pcoll2 = pipeline | Create('pcoll2', [4, 5, 6]) - transform = PipelineTest.CustomTransform() - pcoll1 | transform - with self.assertRaises(RuntimeError) as cm: - pipeline.apply(transform, pcoll2) - self.assertEqual( - cm.exception.message, - 'Transform "CustomTransform" does not have a stable unique label. ' - 'This will prevent updating of pipelines. ' - 'To clone a transform with a new label use: ' - 'transform.clone("NEW LABEL").') - - def test_reuse_cloned_custom_transform_instance(self): - pipeline = Pipeline(self.runner_name) - pcoll1 = pipeline | Create('pcoll1', [1, 2, 3]) - pcoll2 = pipeline | Create('pcoll2', [4, 5, 6]) - transform = PipelineTest.CustomTransform() - result1 = pcoll1 | transform - result2 = pcoll2 | transform.clone('new label') - assert_that(result1, equal_to([2, 3, 4]), label='r1') - assert_that(result2, equal_to([5, 6, 7]), label='r2') - pipeline.run() - - def test_apply_custom_callable(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('pcoll', [1, 2, 3]) - result = pipeline.apply(PipelineTest.custom_callable, pcoll) - assert_that(result, equal_to([2, 3, 4])) - pipeline.run() - - def test_transform_no_super_init(self): - class AddSuffix(PTransform): - - def __init__(self, suffix): - # No call to super(...).__init__ - self.suffix = suffix - - def apply(self, pcoll): - return pcoll | Map(lambda x: x + self.suffix) - - self.assertEqual( - ['a-x', 'b-x', 'c-x'], - sorted(['a', 'b', 'c'] | AddSuffix('-x'))) - - def test_cached_pvalues_are_refcounted(self): - """Test that cached PValues are refcounted and deleted. - - The intermediary PValues computed by the workflow below contain - one million elements so if the refcounting does not work the number of - objects tracked by the garbage collector will increase by a few millions - by the time we execute the final Map checking the objects tracked. - Anything that is much larger than what we started with will fail the test. - """ - def check_memory(value, count_threshold): - gc.collect() - objects_count = len(gc.get_objects()) - if objects_count > count_threshold: - raise RuntimeError( - 'PValues are not refcounted: %s, %s' % ( - objects_count, count_threshold)) - return value - - def create_dupes(o, _): - yield o - yield SideOutputValue('side', o) - - pipeline = Pipeline('DirectPipelineRunner') - - gc.collect() - count_threshold = len(gc.get_objects()) + 10000 - biglist = pipeline | Create('oom:create', ['x'] * 1000000) - dupes = ( - biglist - | Map('oom:addone', lambda x: (x, 1)) - | FlatMap('oom:dupes', create_dupes, - AsIter(biglist)).with_outputs('side', main='main')) - result = ( - (dupes.side, dupes.main, dupes.side) - | Flatten('oom:flatten') - | CombinePerKey('oom:combine', sum) - | Map('oom:check', check_memory, count_threshold)) - - assert_that(result, equal_to([('x', 3000000)])) - pipeline.run() - self.assertEqual( - pipeline.runner.debug_counters['element_counts'], - { - 'oom:flatten': 3000000, - ('oom:combine/GroupByKey/reify_windows', None): 3000000, - ('oom:dupes/oom:dupes', 'side'): 1000000, - ('oom:dupes/oom:dupes', None): 1000000, - 'oom:create': 1000000, - ('oom:addone', None): 1000000, - 'oom:combine/GroupByKey/group_by_key': 1, - ('oom:check', None): 1, - 'assert_that/singleton': 1, - ('assert_that/Map(match)', None): 1, - ('oom:combine/GroupByKey/group_by_window', None): 1, - ('oom:combine/Combine/ParDo(CombineValuesDoFn)', None): 1}) - - def test_pipeline_as_context(self): - def raise_exception(exn): - raise exn - with self.assertRaises(ValueError): - with Pipeline(self.runner_name) as p: - # pylint: disable=expression-not-assigned - p | Create([ValueError]) | Map(raise_exception) - - def test_eager_pipeline(self): - p = Pipeline('EagerPipelineRunner') - self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x)) - - -class DiskCachedRunnerPipelineTest(PipelineTest): - - def setUp(self): - self.runner_name = 'DiskCachedPipelineRunner' - - def test_cached_pvalues_are_refcounted(self): - # Takes long with disk spilling. - pass - - def test_eager_pipeline(self): - # Tests eager runner only - pass - - -class Bacon(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--slices', type=int) - - -class Eggs(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--style', default='scrambled') - - -class Breakfast(Bacon, Eggs): - pass - - -class PipelineOptionsTest(unittest.TestCase): - - def test_flag_parsing(self): - options = Breakfast(['--slices=3', '--style=sunny side up', '--ignored']) - self.assertEquals(3, options.slices) - self.assertEquals('sunny side up', options.style) - - def test_keyword_parsing(self): - options = Breakfast( - ['--slices=3', '--style=sunny side up', '--ignored'], - slices=10) - self.assertEquals(10, options.slices) - self.assertEquals('sunny side up', options.style) - - def test_attribute_setting(self): - options = Breakfast(slices=10) - self.assertEquals(10, options.slices) - options.slices = 20 - self.assertEquals(20, options.slices) - - def test_view_as(self): - generic_options = PipelineOptions(['--slices=3']) - self.assertEquals(3, generic_options.view_as(Bacon).slices) - self.assertEquals(3, generic_options.view_as(Breakfast).slices) - - generic_options.view_as(Breakfast).slices = 10 - self.assertEquals(10, generic_options.view_as(Bacon).slices) - - with self.assertRaises(AttributeError): - generic_options.slices # pylint: disable=pointless-statement - - with self.assertRaises(AttributeError): - generic_options.view_as(Eggs).slices # pylint: disable=expression-not-assigned - - def test_defaults(self): - options = Breakfast(['--slices=3']) - self.assertEquals(3, options.slices) - self.assertEquals('scrambled', options.style) - - def test_dir(self): - options = Breakfast() - self.assertEquals( - ['from_dictionary', 'get_all_options', 'slices', 'style', 'view_as'], - [attr for attr in dir(options) if not attr.startswith('_')]) - self.assertEquals( - ['from_dictionary', 'get_all_options', 'style', 'view_as'], - [attr for attr in dir(options.view_as(Eggs)) - if not attr.startswith('_')]) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.DEBUG) - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/pvalue.py b/sdks/python/google/cloud/dataflow/pvalue.py deleted file mode 100644 index 5e40706..0000000 --- a/sdks/python/google/cloud/dataflow/pvalue.py +++ /dev/null @@ -1,459 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""PValue, PCollection: one node of a dataflow graph. - -A node of a dataflow processing graph is a PValue. Currently, there is only -one type: PCollection (a potentially very large set of arbitrary values). -Once created, a PValue belongs to a pipeline and has an associated -transform (of type PTransform), which describes how the value will be -produced when the pipeline gets executed. -""" - -from __future__ import absolute_import - -import collections - - -class PValue(object): - """Base class for PCollection. - - Dataflow users should not construct PValue objects directly in their - pipelines. - - A PValue has the following main characteristics: - (1) Belongs to a pipeline. Added during object initialization. - (2) Has a transform that can compute the value if executed. - (3) Has a value which is meaningful if the transform was executed. - """ - - def __init__(self, pipeline, tag=None, element_type=None): - """Initializes a PValue with all arguments hidden behind keyword arguments. - - Args: - pipeline: Pipeline object for this PValue. - tag: Tag of this PValue. - element_type: The type of this PValue. - """ - self.pipeline = pipeline - self.tag = tag - self.element_type = element_type - # The AppliedPTransform instance for the application of the PTransform - # generating this PValue. The field gets initialized when a transform - # gets applied. - self.producer = None - - def __str__(self): - return '<%s>' % self._str_internal() - - def __repr__(self): - return '<%s at %s>' % (self._str_internal(), hex(id(self))) - - def _str_internal(self): - return '%s transform=%s' % ( - self.__class__.__name__, - self.producer.transform if self.producer else 'n/a') - - def apply(self, *args, **kwargs): - """Applies a transform or callable to a PValue. - - Args: - *args: positional arguments. - **kwargs: keyword arguments. - - The method will insert the pvalue as the next argument following an - optional first label and a transform/callable object. It will call the - pipeline.apply() method with this modified argument list. - """ - if isinstance(args[0], str): - # TODO(robertwb): Make sure labels are properly passed during - # ptransform construction and drop this argument. - args = args[1:] - arglist = list(args) - arglist.insert(1, self) - return self.pipeline.apply(*arglist, **kwargs) - - def __or__(self, ptransform): - return self.pipeline.apply(ptransform, self) - - -class PCollection(PValue): - """A multiple values (potentially huge) container. - - Dataflow users should not construct PCollection objects directly in their - pipelines. - """ - - def __init__(self, pipeline, **kwargs): - """Initializes a PCollection. Do not call directly.""" - super(PCollection, self).__init__(pipeline, **kwargs) - - @property - def windowing(self): - if not hasattr(self, '_windowing'): - self._windowing = self.producer.transform.get_windowing( - self.producer.inputs) - return self._windowing - - def __reduce_ex__(self, unused_version): - # Pickling a PCollection is almost always the wrong thing to do, but we - # can't prohibit it as it often gets implicitly picked up (e.g. as part - # of a closure). - return _InvalidUnpickledPCollection, () - - -class _InvalidUnpickledPCollection(object): - pass - - -class PBegin(PValue): - """A pipeline begin marker used as input to create/read transforms. - - The class is used internally to represent inputs to Create and Read - transforms. This allows us to have transforms that uniformly take PValue(s) - as inputs. - """ - pass - - -class PDone(PValue): - """PDone is the output of a transform that has a trivial result such as Write. - """ - pass - - -class DoOutputsTuple(object): - """An object grouping the multiple outputs of a ParDo or FlatMap transform.""" - - def __init__(self, pipeline, transform, tags, main_tag): - self._pipeline = pipeline - self._tags = tags - self._main_tag = main_tag - self._transform = transform - # The ApplyPTransform instance for the application of the multi FlatMap - # generating this value. The field gets initialized when a transform - # gets applied. - self.producer = None - # Dictionary of PCollections already associated with tags. - self._pcolls = {} - - def __str__(self): - return '<%s>' % self._str_internal() - - def __repr__(self): - return '<%s at %s>' % (self._str_internal(), hex(id(self))) - - def _str_internal(self): - return '%s main_tag=%s tags=%s transform=%s' % ( - self.__class__.__name__, self._main_tag, self._tags, self._transform) - - def __iter__(self): - """Iterates over tags returning for each call a (tag, pvalue) pair.""" - if self._main_tag is not None: - yield self[self._main_tag] - for tag in self._tags: - yield self[tag] - - def __getattr__(self, tag): - # Special methods which may be accessed before the object is - # fully constructed (e.g. in unpickling). - if tag[:2] == tag[-2:] == '__': - return object.__getattr__(self, tag) - return self[tag] - - def __getitem__(self, tag): - # Accept int tags so that we can look at Partition tags with the - # same ints that we used in the partition function. - # TODO(gildea): Consider requiring string-based tags everywhere. - # This will require a partition function that does not return ints. - if isinstance(tag, int): - tag = str(tag) - if tag == self._main_tag: - tag = None - elif self._tags and tag not in self._tags: - raise ValueError( - 'Tag %s is neither the main tag %s nor any of the side tags %s' % ( - tag, self._main_tag, self._tags)) - # Check if we accessed this tag before. - if tag in self._pcolls: - return self._pcolls[tag] - if tag is not None: - self._transform.side_output_tags.add(tag) - pcoll = PCollection(self._pipeline, tag=tag) - # Transfer the producer from the DoOutputsTuple to the resulting - # PCollection. - pcoll.producer = self.producer - self.producer.add_output(pcoll, tag) - self._pcolls[tag] = pcoll - return pcoll - - -class SideOutputValue(object): - """An object representing a tagged value. - - ParDo, Map, and FlatMap transforms can emit values on multiple outputs which - are distinguished by string tags. The DoFn will return plain values - if it wants to emit on the main output and SideOutputValue objects - if it wants to emit a value on a specific tagged output. - """ - - def __init__(self, tag, value): - if not isinstance(tag, basestring): - raise TypeError( - 'Attempting to create a SideOutputValue with non-string tag %s' % tag) - self.tag = tag - self.value = value - - -class PCollectionView(PValue): - """An immutable view of a PCollection that can be used as a side input.""" - - def __init__(self, pipeline): - """Initializes a PCollectionView. Do not call directly.""" - super(PCollectionView, self).__init__(pipeline) - - @property - def windowing(self): - if not hasattr(self, '_windowing'): - self._windowing = self.producer.transform.get_windowing( - self.producer.inputs) - return self._windowing - - def _view_options(self): - """Internal options corresponding to specific view. - - Intended for internal use by runner implementations. - - Returns: - Tuple of options for the given view. - """ - return () - - -class SingletonPCollectionView(PCollectionView): - """A PCollectionView that contains a single object.""" - - def __init__(self, pipeline, has_default, default_value): - super(SingletonPCollectionView, self).__init__(pipeline) - self.has_default = has_default - self.default_value = default_value - - def _view_options(self): - return (self.has_default, self.default_value) - - -class IterablePCollectionView(PCollectionView): - """A PCollectionView that can be treated as an iterable.""" - pass - - -class ListPCollectionView(PCollectionView): - """A PCollectionView that can be treated as a list.""" - pass - - -class DictPCollectionView(PCollectionView): - """A PCollectionView that can be treated as a dict.""" - pass - - -def _get_cached_view(pipeline, key): - return pipeline._view_cache.get(key, None) # pylint: disable=protected-access - - -def _cache_view(pipeline, key, view): - pipeline._view_cache[key] = view # pylint: disable=protected-access - - -def can_take_label_as_first_argument(callee): - """Decorator to allow the "label" kwarg to be passed as the first argument. - - For example, since AsSingleton is annotated with this decorator, this allows - the call "AsSingleton(pcoll, label='label1')" to be written more succinctly - as "AsSingleton('label1', pcoll)". - - Args: - callee: The callable to be called with an optional label argument. - - Returns: - Callable that allows (but does not require) a string label as its first - argument. - """ - def _inner(maybe_label, *args, **kwargs): - if isinstance(maybe_label, basestring): - return callee(*args, label=maybe_label, **kwargs) - return callee(*((maybe_label,) + args), **kwargs) - return _inner - - -def _format_view_label(pcoll): - # The monitoring UI doesn't like '/' character in transform labels. - if not pcoll.producer: - return str(pcoll.tag) - return '%s.%s' % (pcoll.producer.full_label.replace('/', '|'), - pcoll.tag) - - -_SINGLETON_NO_DEFAULT = object() - - -@can_take_label_as_first_argument -def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pylint: disable=invalid-name - """Create a SingletonPCollectionView from the contents of input PCollection. - - The input PCollection should contain at most one element (per window) and the - resulting PCollectionView can then be used as a side input to PTransforms. If - the PCollectionView is empty (for a given window), the side input value will - be the default_value, if specified; otherwise, it will be an EmptySideInput - object. - - Args: - pcoll: Input pcollection. - default_value: Default value for the singleton view. - label: Label to be specified if several AsSingleton's with different - defaults for the same PCollection. - - Returns: - A singleton PCollectionView containing the element as above. - """ - label = label or _format_view_label(pcoll) - has_default = default_value is not _SINGLETON_NO_DEFAULT - if not has_default: - default_value = None - - # Don't recreate the view if it was already created. - hashable_default_value = ('val', default_value) - if not isinstance(default_value, collections.Hashable): - # Massage default value to treat as hash key. - hashable_default_value = ('id', id(default_value)) - cache_key = (pcoll, AsSingleton, has_default, hashable_default_value) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from google.cloud.dataflow.transforms import sideinputs # pylint: disable=g-import-not-at-top - view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value, - label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view - - -@can_take_label_as_first_argument -def AsIter(pcoll, label=None): # pylint: disable=invalid-name - """Create an IterablePCollectionView from the elements of input PCollection. - - The contents of the given PCollection will be available as an iterable in - PTransforms that use the returned PCollectionView as a side input. - - Args: - pcoll: Input pcollection. - label: Label to be specified if several AsIter's for the same PCollection. - - Returns: - An iterable PCollectionView containing the elements as above. - """ - label = label or _format_view_label(pcoll) - - # Don't recreate the view if it was already created. - cache_key = (pcoll, AsIter) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from google.cloud.dataflow.transforms import sideinputs # pylint: disable=g-import-not-at-top - view = (pcoll | sideinputs.ViewAsIterable(label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view - - -@can_take_label_as_first_argument -def AsList(pcoll, label=None): # pylint: disable=invalid-name - """Create a ListPCollectionView from the elements of input PCollection. - - The contents of the given PCollection will be available as a list-like object - in PTransforms that use the returned PCollectionView as a side input. - - Args: - pcoll: Input pcollection. - label: Label to be specified if several AsList's for the same PCollection. - - Returns: - A list PCollectionView containing the elements as above. - """ - label = label or _format_view_label(pcoll) - - # Don't recreate the view if it was already created. - cache_key = (pcoll, AsList) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from google.cloud.dataflow.transforms import sideinputs # pylint: disable=g-import-not-at-top - view = (pcoll | sideinputs.ViewAsList(label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view - - -@can_take_label_as_first_argument -def AsDict(pcoll, label=None): # pylint: disable=invalid-name - """Create a DictPCollectionView from the elements of input PCollection. - - The contents of the given PCollection whose elements are 2-tuples of key and - value will be available as a dict-like object in PTransforms that use the - returned PCollectionView as a side input. - - Args: - pcoll: Input pcollection containing 2-tuples of key and value. - label: Label to be specified if several AsDict's for the same PCollection. - - Returns: - A dict PCollectionView containing the dict as above. - """ - label = label or _format_view_label(pcoll) - - # Don't recreate the view if it was already created. - cache_key = (pcoll, AsDict) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from google.cloud.dataflow.transforms import sideinputs # pylint: disable=g-import-not-at-top - view = (pcoll | sideinputs.ViewAsDict(label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view - - -class EmptySideInput(object): - """Value indicating when a singleton side input was empty. - - If a PCollection was furnished as a singleton side input to a PTransform, and - that PCollection was empty, then this value is supplied to the DoFn in the - place where a value from a non-empty PCollection would have gone. This alerts - the DoFn that the side input PCollection was empty. Users may want to check - whether side input values are EmptySideInput, but they will very likely never - want to create new instances of this class themselves. - """ - pass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/pvalue_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/pvalue_test.py b/sdks/python/google/cloud/dataflow/pvalue_test.py deleted file mode 100644 index d3c1c44..0000000 --- a/sdks/python/google/cloud/dataflow/pvalue_test.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for the PValue and PCollection classes.""" - -import unittest - -from google.cloud.dataflow.pipeline import Pipeline -from google.cloud.dataflow.pvalue import AsDict -from google.cloud.dataflow.pvalue import AsIter -from google.cloud.dataflow.pvalue import AsList -from google.cloud.dataflow.pvalue import AsSingleton -from google.cloud.dataflow.pvalue import PValue -from google.cloud.dataflow.transforms import Create - - -class FakePipeline(Pipeline): - """Fake pipeline object used to check if apply() receives correct args.""" - - def apply(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs - - -class PValueTest(unittest.TestCase): - - def test_pvalue_expected_arguments(self): - pipeline = Pipeline('DirectPipelineRunner') - value = PValue(pipeline) - self.assertEqual(pipeline, value.pipeline) - - def test_pcollectionview_not_recreated(self): - pipeline = Pipeline('DirectPipelineRunner') - value = pipeline | Create('create1', [1, 2, 3]) - value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)]) - self.assertEqual(AsSingleton(value), AsSingleton(value)) - self.assertEqual(AsSingleton('new', value, default_value=1), - AsSingleton('new', value, default_value=1)) - self.assertNotEqual(AsSingleton(value), - AsSingleton('new', value, default_value=1)) - self.assertEqual(AsIter(value), AsIter(value)) - self.assertEqual(AsList(value), AsList(value)) - self.assertEqual(AsDict(value2), AsDict(value2)) - - self.assertNotEqual(AsSingleton(value), AsSingleton(value2)) - self.assertNotEqual(AsIter(value), AsIter(value2)) - self.assertNotEqual(AsList(value), AsList(value2)) - self.assertNotEqual(AsDict(value), AsDict(value2)) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/python_sdk_releases.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/python_sdk_releases.py b/sdks/python/google/cloud/dataflow/python_sdk_releases.py deleted file mode 100644 index 52e07aa..0000000 --- a/sdks/python/google/cloud/dataflow/python_sdk_releases.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Descriptions of the versions of the SDK. - -This manages the features and tests supported by different versions of the -Dataflow SDK for Python. - -To add feature 'foo' to a particular release, add a 'properties' value with -'feature_foo': True. To remove feature 'foo' from a particular release, add a -'properties' value with 'feature_foo': False. Features are cumulative and can -be added and removed multiple times. - -By default, all tests are enabled. To remove test 'bar' from a particular -release, add a 'properties' value with 'test_bar': False. To add it back to a -subsequent release, add a 'properties' value with 'test_bar': True. Tests are -cumulative and can be removed and added multiple times. - -See go/dataflow-testing for more information. -""" - -OLDEST_SUPPORTED_PYTHON_SDK = 'python-0.1.4' - -RELEASES = [ - {'name': 'python-0.2.7',}, - {'name': 'python-0.2.6',}, - {'name': 'python-0.2.5',}, - {'name': 'python-0.2.4',}, - {'name': 'python-0.2.3',}, - {'name': 'python-0.2.2',}, - {'name': 'python-0.2.1',}, - {'name': 'python-0.2.0',}, - {'name': 'python-0.1.5',}, - {'name': 'python-0.1.4',}, - {'name': 'python-0.1.3',}, - {'name': 'python-0.1.2',}, - {'name': 'python-0.1.1', - 'properties': { - 'feature_python': True, - } - }, -] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/runners/__init__.py b/sdks/python/google/cloud/dataflow/runners/__init__.py deleted file mode 100644 index 06d1af4..0000000 --- a/sdks/python/google/cloud/dataflow/runners/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Runner objects execute a Pipeline. - -This package defines runners, which are used to execute a pipeline. -""" - -from google.cloud.dataflow.runners.dataflow_runner import DataflowPipelineRunner -from google.cloud.dataflow.runners.direct_runner import DirectPipelineRunner -from google.cloud.dataflow.runners.runner import create_runner -from google.cloud.dataflow.runners.runner import PipelineRunner -from google.cloud.dataflow.runners.runner import PipelineState http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/runners/common.pxd b/sdks/python/google/cloud/dataflow/runners/common.pxd deleted file mode 100644 index fa1e3d6..0000000 --- a/sdks/python/google/cloud/dataflow/runners/common.pxd +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cdef type SideOutputValue, TimestampedValue, WindowedValue - -cdef class DoFnRunner(object): - - cdef object dofn - cdef object window_fn - cdef object context - cdef object tagged_receivers - cdef object logger - cdef object step_name - - cdef object main_receivers - - cpdef _process_outputs(self, element, results) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/runners/common.py b/sdks/python/google/cloud/dataflow/runners/common.py deleted file mode 100644 index 34e480b..0000000 --- a/sdks/python/google/cloud/dataflow/runners/common.py +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# cython: profile=True - -"""Worker operations executor.""" - -import sys - -from google.cloud.dataflow.internal import util -from google.cloud.dataflow.pvalue import SideOutputValue -from google.cloud.dataflow.transforms import core -from google.cloud.dataflow.transforms.window import TimestampedValue -from google.cloud.dataflow.transforms.window import WindowedValue -from google.cloud.dataflow.transforms.window import WindowFn - - -class FakeLogger(object): - def PerThreadLoggingContext(self, *unused_args, **unused_kwargs): - return self - def __enter__(self): - pass - def __exit__(self, *unused_args): - pass - - -class DoFnRunner(object): - """A helper class for executing ParDo operations. - """ - - def __init__(self, - fn, - args, - kwargs, - side_inputs, - windowing, - context, - tagged_receivers, - logger=None, - step_name=None): - if not args and not kwargs: - self.dofn = fn - else: - args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs) - - class CurriedFn(core.DoFn): - - def start_bundle(self, context): - return fn.start_bundle(context, *args, **kwargs) - - def process(self, context): - return fn.process(context, *args, **kwargs) - - def finish_bundle(self, context): - return fn.finish_bundle(context, *args, **kwargs) - self.dofn = CurriedFn() - self.window_fn = windowing.windowfn - self.context = context - self.tagged_receivers = tagged_receivers - self.logger = logger or FakeLogger() - self.step_name = step_name - - # Optimize for the common case. - self.main_receivers = tagged_receivers[None] - - def start(self): - self.context.set_element(None) - try: - self._process_outputs(None, self.dofn.start_bundle(self.context)) - except BaseException as exn: - self.reraise_augmented(exn) - - def finish(self): - self.context.set_element(None) - try: - self._process_outputs(None, self.dofn.finish_bundle(self.context)) - except BaseException as exn: - self.reraise_augmented(exn) - - def process(self, element): - try: - with self.logger.PerThreadLoggingContext(step_name=self.step_name): - self.context.set_element(element) - self._process_outputs(element, self.dofn.process(self.context)) - except BaseException as exn: - self.reraise_augmented(exn) - - def reraise_augmented(self, exn): - if getattr(exn, '_tagged_with_step', False) or not self.step_name: - raise - args = exn.args - if args and isinstance(args[0], str): - args = (args[0] + " [while running '%s']" % self.step_name,) + args[1:] - # Poor man's exception chaining. - raise type(exn), args, sys.exc_info()[2] - else: - raise - - def _process_outputs(self, element, results): - """Dispatch the result of computation to the appropriate receivers. - - A value wrapped in a SideOutputValue object will be unwrapped and - then dispatched to the appropriate indexed output. - """ - if results is None: - return - for result in results: - tag = None - if isinstance(result, SideOutputValue): - tag = result.tag - if not isinstance(tag, basestring): - raise TypeError('In %s, tag %s is not a string' % (self, tag)) - result = result.value - if isinstance(result, WindowedValue): - windowed_value = result - elif element is None: - # Start and finish have no element from which to grab context, - # but may emit elements. - if isinstance(result, TimestampedValue): - value = result.value - timestamp = result.timestamp - assign_context = NoContext(value, timestamp) - else: - value = result - timestamp = -1 - assign_context = NoContext(value) - windowed_value = WindowedValue( - value, timestamp, self.window_fn.assign(assign_context)) - elif isinstance(result, TimestampedValue): - assign_context = WindowFn.AssignContext( - result.timestamp, result.value, element.windows) - windowed_value = WindowedValue( - result.value, result.timestamp, - self.window_fn.assign(assign_context)) - else: - windowed_value = element.with_value(result) - if tag is None: - self.main_receivers.output(windowed_value) - else: - self.tagged_receivers[tag].output(windowed_value) - -class NoContext(WindowFn.AssignContext): - """An uninspectable WindowFn.AssignContext.""" - NO_VALUE = object() - def __init__(self, value, timestamp=NO_VALUE): - self.value = value - self._timestamp = timestamp - @property - def timestamp(self): - if self._timestamp is self.NO_VALUE: - raise ValueError('No timestamp in this context.') - else: - return self._timestamp - @property - def existing_windows(self): - raise ValueError('No existing_windows in this context.') - - -class DoFnState(object): - """Keeps track of state that DoFns want, currently, user counters. - """ - - def __init__(self, counter_factory): - self.step_name = '' - self._counter_factory = counter_factory - - def counter_for(self, aggregator): - """Looks up the counter for this aggregator, creating one if necessary.""" - return self._counter_factory.get_aggregator_counter( - self.step_name, aggregator)