Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C088200B33 for ; Wed, 15 Jun 2016 01:12:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8A865160A65; Tue, 14 Jun 2016 23:12:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 15CD6160A63 for ; Wed, 15 Jun 2016 01:12:54 +0200 (CEST) Received: (qmail 71426 invoked by uid 500); 14 Jun 2016 23:12:54 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 71405 invoked by uid 99); 14 Jun 2016 23:12:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 23:12:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id AE9EB1A00F0 for ; Tue, 14 Jun 2016 23:12:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 3gG2JZPfX_P9 for ; Tue, 14 Jun 2016 23:12:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B40BB60D31 for ; Tue, 14 Jun 2016 23:12:38 +0000 (UTC) Received: (qmail 70041 invoked by uid 99); 14 Jun 2016 23:12:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 23:12:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CD1FE0991; Tue, 14 Jun 2016 23:12:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Tue, 14 Jun 2016 23:12:43 -0000 Message-Id: In-Reply-To: <95df9c9428334e3980c0c77c4ddc9382@git.apache.org> References: <95df9c9428334e3980c0c77c4ddc9382@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder archived-at: Tue, 14 Jun 2016 23:12:56 -0000 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/combiners_test.py b/sdks/python/google/cloud/dataflow/transforms/combiners_test.py deleted file mode 100644 index b8142ea..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/combiners_test.py +++ /dev/null @@ -1,225 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for our libraries of combine PTransforms.""" - -import unittest - -import google.cloud.dataflow as df -from google.cloud.dataflow.pipeline import Pipeline -from google.cloud.dataflow.transforms import combiners -import google.cloud.dataflow.transforms.combiners as combine -from google.cloud.dataflow.transforms.core import CombineGlobally -from google.cloud.dataflow.transforms.core import Create -from google.cloud.dataflow.transforms.core import Map -from google.cloud.dataflow.transforms.ptransform import PTransform -from google.cloud.dataflow.transforms.util import assert_that, equal_to - - -class CombineTest(unittest.TestCase): - - def test_builtin_combines(self): - pipeline = Pipeline('DirectPipelineRunner') - - vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] - mean = sum(vals) / float(len(vals)) - size = len(vals) - - # First for global combines. - pcoll = pipeline | Create('start', vals) - result_mean = pcoll | combine.Mean.Globally('mean') - result_count = pcoll | combine.Count.Globally('count') - assert_that(result_mean, equal_to([mean]), label='assert:mean') - assert_that(result_count, equal_to([size]), label='assert:size') - - # Again for per-key combines. - pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals]) - result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey') - result_key_count = pcoll | combine.Count.PerKey('count-perkey') - assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean') - assert_that(result_key_count, equal_to([('a', size)]), label='key:size') - pipeline.run() - - def test_top(self): - pipeline = Pipeline('DirectPipelineRunner') - - # A parameter we'll be sharing with a custom comparator. - names = {0: 'zo', - 1: 'one', - 2: 'twoo', - 3: 'three', - 5: 'fiiive', - 6: 'sssssix', - 9: 'nniiinne'} - - # First for global combines. - pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) - result_top = pcoll | combine.Top.Largest('top', 5) - result_bot = pcoll | combine.Top.Smallest('bot', 4) - result_cmp = pcoll | combine.Top.Of( - 'cmp', - 6, - lambda a, b, names: len(names[a]) < len(names[b]), - names) # Note parameter passed to comparator. - assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top') - assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot') - assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp') - - # Again for per-key combines. - pcoll = pipeline | Create( - 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) - result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5) - result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4) - result_key_cmp = pcoll | combine.Top.PerKey( - 'cmp-perkey', - 6, - lambda a, b, names: len(names[a]) < len(names[b]), - names) # Note parameter passed to comparator. - assert_that(result_key_top, equal_to([('a', [9, 6, 6, 5, 3])]), - label='key:top') - assert_that(result_key_bot, equal_to([('a', [0, 1, 1, 1])]), - label='key:bot') - assert_that(result_key_cmp, equal_to([('a', [9, 6, 6, 5, 3, 2])]), - label='key:cmp') - pipeline.run() - - def test_top_shorthands(self): - pipeline = Pipeline('DirectPipelineRunner') - - pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) - result_top = pcoll | df.CombineGlobally('top', combiners.Largest(5)) - result_bot = pcoll | df.CombineGlobally('bot', combiners.Smallest(4)) - assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top') - assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot') - - pcoll = pipeline | Create( - 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) - result_ktop = pcoll | df.CombinePerKey('top-perkey', combiners.Largest(5)) - result_kbot = pcoll | df.CombinePerKey('bot-perkey', combiners.Smallest(4)) - assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top') - assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot') - pipeline.run() - - def test_sample(self): - - # First test global samples (lots of them). - for ix in xrange(300): - pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | Create('start', [1, 1, 2, 2]) - result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) - def matcher(): - def match(actual): - # There is always exactly one result. - equal_to([1])([len(actual)]) - # There are always exactly three samples in the result. - equal_to([3])([len(actual[0])]) - # Sampling is without replacement. - num_ones = sum(1 for x in actual[0] if x == 1) - num_twos = sum(1 for x in actual[0] if x == 2) - equal_to([1, 2])([num_ones, num_twos]) - return match - assert_that(result, matcher()) - pipeline.run() - - # Now test per-key samples. - pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | Create( - 'start-perkey', - sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) - result = pcoll | combine.Sample.FixedSizePerKey('sample', 3) - def matcher(): - def match(actual): - for _, samples in actual: - equal_to([3])([len(samples)]) - num_ones = sum(1 for x in samples if x == 1) - num_twos = sum(1 for x in samples if x == 2) - equal_to([1, 2])([num_ones, num_twos]) - return match - assert_that(result, matcher()) - pipeline.run() - - def test_tuple_combine_fn(self): - p = Pipeline('DirectPipelineRunner') - result = ( - p - | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) - | df.CombineGlobally(combine.TupleCombineFn(max, - combine.MeanCombineFn(), - sum)).without_defaults()) - assert_that(result, equal_to([('c', 111.0 / 3, 99.0)])) - p.run() - - def test_tuple_combine_fn_without_defaults(self): - p = Pipeline('DirectPipelineRunner') - result = ( - p - | Create([1, 1, 2, 3]) - | df.CombineGlobally( - combine.TupleCombineFn(min, combine.MeanCombineFn(), max) - .with_common_input()).without_defaults()) - assert_that(result, equal_to([(1, 7.0 / 4, 3)])) - p.run() - - def test_to_list_and_to_dict(self): - pipeline = Pipeline('DirectPipelineRunner') - the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] - pcoll = pipeline | Create('start', the_list) - result = pcoll | combine.ToList('to list') - def matcher(expected): - def match(actual): - equal_to(expected[0])(actual[0]) - return match - assert_that(result, matcher([the_list])) - pipeline.run() - - pipeline = Pipeline('DirectPipelineRunner') - pairs = [(1, 2), (3, 4), (5, 6)] - pcoll = pipeline | Create('start-pairs', pairs) - result = pcoll | combine.ToDict('to dict') - def matcher(): - def match(actual): - equal_to([1])([len(actual)]) - equal_to(pairs)(actual[0].iteritems()) - return match - assert_that(result, matcher()) - pipeline.run() - - def test_combine_globally_with_default(self): - p = Pipeline('DirectPipelineRunner') - assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0])) - p.run() - - def test_combine_globally_without_default(self): - p = Pipeline('DirectPipelineRunner') - result = p | Create([]) | CombineGlobally(sum).without_defaults() - assert_that(result, equal_to([])) - p.run() - - def test_combine_globally_with_default_side_input(self): - class CombineWithSideInput(PTransform): - def apply(self, pcoll): - side = pcoll | CombineGlobally(sum).as_singleton_view() - main = pcoll.pipeline | Create([None]) - return main | Map(lambda _, s: s, side) - - p = Pipeline('DirectPipelineRunner') - result1 = p | Create('label1', []) | CombineWithSideInput('L1') - result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2') - assert_that(result1, equal_to([0]), label='r1') - assert_that(result2, equal_to([10]), label='r2') - p.run() - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/core.py b/sdks/python/google/cloud/dataflow/transforms/core.py deleted file mode 100644 index 6db0099..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/core.py +++ /dev/null @@ -1,1292 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map.""" - -from __future__ import absolute_import - -import copy -import uuid - -from google.cloud.dataflow import pvalue -from google.cloud.dataflow import typehints -from google.cloud.dataflow.coders import typecoders -from google.cloud.dataflow.internal import util -from google.cloud.dataflow.pvalue import AsIter -from google.cloud.dataflow.pvalue import AsSingleton -from google.cloud.dataflow.transforms import ptransform -from google.cloud.dataflow.transforms import window -from google.cloud.dataflow.transforms.ptransform import PTransform -from google.cloud.dataflow.transforms.ptransform import ptransform_fn -from google.cloud.dataflow.transforms.ptransform import PTransformWithSideInputs -from google.cloud.dataflow.transforms.window import MIN_TIMESTAMP -from google.cloud.dataflow.transforms.window import OutputTimeFn -from google.cloud.dataflow.transforms.window import WindowedValue -from google.cloud.dataflow.transforms.window import WindowFn -from google.cloud.dataflow.typehints import Any -from google.cloud.dataflow.typehints import get_type_hints -from google.cloud.dataflow.typehints import is_consistent_with -from google.cloud.dataflow.typehints import Iterable -from google.cloud.dataflow.typehints import KV -from google.cloud.dataflow.typehints import trivial_inference -from google.cloud.dataflow.typehints import TypeCheckError -from google.cloud.dataflow.typehints import Union -from google.cloud.dataflow.typehints import WithTypeHints -from google.cloud.dataflow.typehints.trivial_inference import element_type -from google.cloud.dataflow.utils.options import TypeOptions - - -class DoFnProcessContext(object): - """A processing context passed to DoFn methods during execution. - - Most importantly, a DoFn.process method will access context.element - to get the element it is supposed to process. - - Attributes: - label: label of the ParDo whose element is being processed. - element: element being processed - (in process method only; always None in start_bundle and finish_bundle) - timestamp: timestamp of the element - (in process method only; always None in start_bundle and finish_bundle) - windows: windows of the element - (in process method only; always None in start_bundle and finish_bundle) - state: a DoFnState object, which holds the runner's internal state - for this element. For example, aggregator state is here. - Not used by the pipeline code. - """ - - def __init__(self, label, element=None, state=None): - """Initialize a processing context object with an element and state. - - The element represents one value from a PCollection that will be accessed - by a DoFn object during pipeline execution, and state is an arbitrary object - where counters and other pipeline state information can be passed in. - - DoFnProcessContext objects are also used as inputs to PartitionFn instances. - - Args: - label: label of the PCollection whose element is being processed. - element: element of a PCollection being processed using this context. - state: a DoFnState object with state to be passed in to the DoFn object. - """ - self.label = label - self.state = state - if element is not None: - self.set_element(element) - - def set_element(self, windowed_value): - if windowed_value is None: - # Not currently processing an element. - if hasattr(self, 'element'): - del self.element - del self.timestamp - del self.windows - else: - self.element = windowed_value.value - self.timestamp = windowed_value.timestamp - self.windows = windowed_value.windows - - def aggregate_to(self, aggregator, input_value): - """Provide a new input value for the aggregator. - - Args: - aggregator: the aggregator to update - input_value: the new value to input to the combine_fn of this aggregator. - """ - self.state.counter_for(aggregator).update(input_value) - - -class DoFn(WithTypeHints): - """A function object used by a transform with custom processing. - - The ParDo transform is such a transform. The ParDo.apply - method will take an object of type DoFn and apply it to all elements of a - PCollection object. - - In order to have concrete DoFn objects one has to subclass from DoFn and - define the desired behavior (start_bundle/finish_bundle and process) or wrap a - callable object using the CallableWrapperDoFn class. - """ - - def default_label(self): - return self.__class__.__name__ - - def infer_output_type(self, input_type): - # TODO(robertwb): Side inputs types. - # TODO(robertwb): Assert compatibility with input type hint? - return self._strip_output_annotations( - trivial_inference.infer_return_type(self.process, [input_type])) - - def start_bundle(self, context, *args, **kwargs): - """Called before a bundle of elements is processed on a worker. - - Elements to be processed are split into bundles and distributed - to workers. Before a worker calls process() on the first element - of its bundle, it calls this method. - - Args: - context: a DoFnProcessContext object - *args: side inputs - **kwargs: keyword side inputs - - """ - pass - - def finish_bundle(self, context, *args, **kwargs): - """Called after a bundle of elements is processed on a worker. - - Args: - context: a DoFnProcessContext object - *args: side inputs - **kwargs: keyword side inputs - """ - pass - - def process(self, context, *args, **kwargs): - """Called for each element of a pipeline. - - Args: - context: a DoFnProcessContext object containing, among other - attributes, the element to be processed. - See the DoFnProcessContext documentation for details. - *args: side inputs - **kwargs: keyword side inputs - """ - raise NotImplementedError - - @staticmethod - def from_callable(fn): - return CallableWrapperDoFn(fn) - - def process_argspec_fn(self): - """Returns the Python callable that will eventually be invoked. - - This should ideally be the user-level function that is called with - the main and (if any) side inputs, and is used to relate the type - hint parameters with the input parameters (e.g., by argument name). - """ - return self.process - - def _strip_output_annotations(self, type_hint): - annotations = (window.TimestampedValue, window.WindowedValue, - pvalue.SideOutputValue) - # TODO(robertwb): These should be parameterized types that the - # type inferencer understands. - if (type_hint in annotations - or trivial_inference.element_type(type_hint) in annotations): - return Any - else: - return type_hint - - -class CallableWrapperDoFn(DoFn): - """A DoFn (function) object wrapping a callable object. - - The purpose of this class is to conveniently wrap simple functions and use - them in transforms. - """ - - def __init__(self, fn): - """Initializes a CallableWrapperDoFn object wrapping a callable. - - Args: - fn: A callable object. - - Raises: - TypeError: if fn parameter is not a callable type. - """ - if not callable(fn): - raise TypeError('Expected a callable object instead of: %r' % fn) - - self._fn = fn - - super(CallableWrapperDoFn, self).__init__() - - def __repr__(self): - return 'CallableWrapperDoFn(%s)' % self._fn - - def default_type_hints(self): - type_hints = get_type_hints(self._fn) - # If the fn was a DoFn annotated with a type-hint that hinted a return - # type compatible with Iterable[Any], then we strip off the outer - # container type due to the 'flatten' portion of FlatMap. - # TODO(robertwb): Should we require an iterable specification for FlatMap? - if type_hints.output_types: - args, kwargs = type_hints.output_types - if len(args) == 1 and is_consistent_with(args[0], Iterable[Any]): - type_hints = type_hints.copy() - type_hints.set_output_types(element_type(args[0]), **kwargs) - return type_hints - - def infer_output_type(self, input_type): - return self._strip_output_annotations( - trivial_inference.infer_return_type(self._fn, [input_type])) - - def process(self, context, *args, **kwargs): - return self._fn(context.element, *args, **kwargs) - - def process_argspec_fn(self): - return getattr(self._fn, '_argspec_fn', self._fn) - - -class CombineFn(WithTypeHints): - """A function object used by a Combine transform with custom processing. - - A CombineFn specifies how multiple values in all or part of a PCollection can - be merged into a single value---essentially providing the same kind of - information as the arguments to the Python "reduce" builtin (except for the - input argument, which is an instance of CombineFnProcessContext). The - combining process proceeds as follows: - - 1. Input values are partitioned into one or more batches. - 2. For each batch, the create_accumulator method is invoked to create a fresh - initial "accumulator" value representing the combination of zero values. - 3. For each input value in the batch, the add_inputs method is invoked to - combine more values with the accumulator for that batch. - 4. The merge_accumulators method is invoked to combine accumulators from - separate batches into a single combined output accumulator value, once all - of the accumulators have had all the input value in their batches added to - them. This operation is invoked repeatedly, until there is only one - accumulator value left. - 5. The extract_output operation is invoked on the final accumulator to get - the output value. - """ - - def default_label(self): - return self.__class__.__name__ - - def create_accumulator(self, *args, **kwargs): - """Return a fresh, empty accumulator for the combine operation. - - Args: - *args: Additional arguments and side inputs. - **kwargs: Additional arguments and side inputs. - """ - raise NotImplementedError(str(self)) - - def add_input(self, accumulator, element, *args, **kwargs): - """Return result of folding element into accumulator. - - CombineFn implementors must override either add_input or add_inputs. - - Args: - accumulator: the current accumulator - element: the element to add - *args: Additional arguments and side inputs. - **kwargs: Additional arguments and side inputs. - """ - raise NotImplementedError(str(self)) - - def add_inputs(self, accumulator, elements, *args, **kwargs): - """Returns the result of folding each element in elements into accumulator. - - This is provided in case the implementation affords more efficient - bulk addition of elements. The default implementation simply loops - over the inputs invoking add_input for each one. - - Args: - accumulator: the current accumulator - elements: the elements to add - *args: Additional arguments and side inputs. - **kwargs: Additional arguments and side inputs. - """ - for element in elements: - accumulator = self.add_input(accumulator, element, *args, **kwargs) - return accumulator - - def merge_accumulators(self, accumulators, *args, **kwargs): - """Returns the result of merging several accumulators - to a single accumulator value. - - Args: - accumulators: the accumulators to merge - *args: Additional arguments and side inputs. - **kwargs: Additional arguments and side inputs. - """ - raise NotImplementedError(str(self)) - - def extract_output(self, accumulator, *args, **kwargs): - """Return result of converting accumulator into the output value. - - Args: - accumulator: the final accumulator value computed by this CombineFn - for the entire input key or PCollection. - *args: Additional arguments and side inputs. - **kwargs: Additional arguments and side inputs. - """ - raise NotImplementedError(str(self)) - - def apply(self, elements, *args, **kwargs): - """Returns result of applying this CombineFn to the input values. - - Args: - elements: the set of values to combine. - *args: Additional arguments and side inputs. - **kwargs: Additional arguments and side inputs. - """ - return self.extract_output( - self.add_inputs( - self.create_accumulator(*args, **kwargs), elements, - *args, **kwargs), - *args, **kwargs) - - def for_input_type(self, input_type): - """Returns a specialized implementation of self, if it exists. - - Otherwise, returns self. - - Args: - input_type: the type of input elements. - """ - return self - - @staticmethod - def from_callable(fn): - return CallableWrapperCombineFn(fn) - - @staticmethod - def maybe_from_callable(fn): - return fn if isinstance(fn, CombineFn) else CallableWrapperCombineFn(fn) - - -class CallableWrapperCombineFn(CombineFn): - """A CombineFn (function) object wrapping a callable object. - - The purpose of this class is to conveniently wrap simple functions and use - them in Combine transforms. - """ - _EMPTY = object() - - def __init__(self, fn): - """Initializes a CallableFn object wrapping a callable. - - Args: - fn: A callable object that reduces elements of an iterable to a single - value (like the builtins sum and max). This callable must be capable of - receiving the kind of values it generates as output in its input, and - for best results, its operation must be commutative and associative. - - Raises: - TypeError: if fn parameter is not a callable type. - """ - if not callable(fn): - raise TypeError('Expected a callable object instead of: %r' % fn) - - super(CallableWrapperCombineFn, self).__init__() - self._fn = fn - - def __repr__(self): - return "CallableWrapperCombineFn(%s)" % self._fn - - def create_accumulator(self, *args, **kwargs): - return self._EMPTY - - def add_input(self, accumulator, element, *args, **kwargs): - if accumulator is self._EMPTY: - return element - else: - return self._fn([accumulator, element], *args, **kwargs) - - def add_inputs(self, accumulator, elements, *args, **kwargs): - if accumulator is self._EMPTY: - return self._fn(elements, *args, **kwargs) - elif isinstance(elements, (list, tuple)): - return self._fn([accumulator] + elements, *args, **kwargs) - else: - def union(): - yield accumulator - for e in elements: - yield e - return self._fn(union(), *args, **kwargs) - - def merge_accumulators(self, accumulators, *args, **kwargs): - # It's (weakly) assumed that self._fn is associative. - return self._fn(accumulators, *args, **kwargs) - - def extract_output(self, accumulator, *args, **kwargs): - return self._fn(()) if accumulator is self._EMPTY else accumulator - - def default_type_hints(self): - fn_hints = get_type_hints(self._fn) - if fn_hints.input_types is None: - return fn_hints - else: - # fn(Iterable[V]) -> V becomes CombineFn(V) -> V - input_args, input_kwargs = fn_hints.input_types - if not input_args: - if len(input_kwargs) == 1: - input_args, input_kwargs = tuple(input_kwargs.values()), {} - else: - raise TypeError('Combiner input type must be specified positionally.') - if not is_consistent_with(input_args[0], Iterable[Any]): - raise TypeCheckError( - 'All functions for a Combine PTransform must accept a ' - 'single argument compatible with: Iterable[Any]. ' - 'Instead a function with input type: %s was received.' - % input_args[0]) - input_args = (element_type(input_args[0]),) + input_args[1:] - # TODO(robertwb): Assert output type is consistent with input type? - hints = fn_hints.copy() - hints.set_input_types(*input_args, **input_kwargs) - return hints - - def for_input_type(self, input_type): - # Avoid circular imports. - from google.cloud.dataflow.transforms import cy_combiners - if self._fn is any: - return cy_combiners.AnyCombineFn() - elif self._fn is all: - return cy_combiners.AllCombineFn() - else: - known_types = { - (sum, int): cy_combiners.SumInt64Fn(), - (min, int): cy_combiners.MinInt64Fn(), - (max, int): cy_combiners.MaxInt64Fn(), - (sum, float): cy_combiners.SumFloatFn(), - (min, float): cy_combiners.MinFloatFn(), - (max, float): cy_combiners.MaxFloatFn(), - } - return known_types.get((self._fn, input_type), self) - - -class PartitionFn(WithTypeHints): - """A function object used by a Partition transform. - - A PartitionFn specifies how individual values in a PCollection will be placed - into separate partitions, indexed by an integer. - """ - - def default_label(self): - return self.__class__.__name__ - - def partition_for(self, context, num_partitions, *args, **kwargs): - """Specify which partition will receive this element. - - Args: - context: A DoFnProcessContext containing an element of the - input PCollection. - num_partitions: Number of partitions, i.e., output PCollections. - *args: optional parameters and side inputs. - **kwargs: optional parameters and side inputs. - - Returns: - An integer in [0, num_partitions). - """ - pass - - -class CallableWrapperPartitionFn(PartitionFn): - """A PartitionFn object wrapping a callable object. - - Instances of this class wrap simple functions for use in Partition operations. - """ - - def __init__(self, fn): - """Initializes a PartitionFn object wrapping a callable. - - Args: - fn: A callable object, which should accept the following arguments: - element - element to assign to a partition. - num_partitions - number of output partitions. - and may accept additional arguments and side inputs. - - Raises: - TypeError: if fn is not a callable type. - """ - if not callable(fn): - raise TypeError('Expected a callable object instead of: %r' % fn) - self._fn = fn - - def partition_for(self, context, num_partitions, *args, **kwargs): - return self._fn(context.element, num_partitions, *args, **kwargs) - - -class ParDo(PTransformWithSideInputs): - """A ParDo transform. - - Processes an input PCollection by applying a DoFn to each element and - returning the accumulated results into an output PCollection. The type of the - elements is not fixed as long as the DoFn can deal with it. In reality - the type is restrained to some extent because the elements sometimes must be - persisted to external storage. See the apply() method comments for a detailed - description of all possible arguments. - - Note that the DoFn must return an iterable for each element of the input - PCollection. An easy way to do this is to use the yield keyword in the - process method. - - Args: - label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - pcoll: a PCollection to be processed. - dofn: a DoFn object to be applied to each element of pcoll argument. - *args: positional arguments passed to the dofn object. - **kwargs: keyword arguments passed to the dofn object. - - Note that the positional and keyword arguments will be processed in order - to detect PCollections that will be computed as side inputs to the - transform. During pipeline execution whenever the DoFn object gets executed - (its apply() method gets called) the PCollection arguments will be replaced - by values from the PCollection in the exact positions where they appear in - the argument lists. - """ - - def __init__(self, fn_or_label, *args, **kwargs): - super(ParDo, self).__init__(fn_or_label, *args, **kwargs) - - if not isinstance(self.fn, DoFn): - raise TypeError('ParDo must be called with a DoFn instance.') - - def default_type_hints(self): - return self.fn.get_type_hints() - - def infer_output_type(self, input_type): - return trivial_inference.element_type( - self.fn.infer_output_type(input_type)) - - def make_fn(self, fn): - return fn if isinstance(fn, DoFn) else CallableWrapperDoFn(fn) - - def process_argspec_fn(self): - return self.fn.process_argspec_fn() - - def apply(self, pcoll): - self.side_output_tags = set() - # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. - self.dofn = self.fn - return pvalue.PCollection(pcoll.pipeline) - - def with_outputs(self, *tags, **main_kw): - """Returns a tagged tuple allowing access to the outputs of a ParDo. - - The resulting object supports access to the - PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over - the available tags (e.g., for tag in o: ...). - - Args: - *tags: if non-empty, list of valid tags. If a list of valid tags is given, - it will be an error to use an undeclared tag later in the pipeline. - **main_kw: dictionary empty or with one key 'main' defining the tag to be - used for the main output (which will not have a tag associated with it). - - Returns: - An object of type DoOutputsTuple that bundles together all the outputs - of a ParDo transform and allows accessing the individual - PCollections for each output using an object.tag syntax. - - Raises: - TypeError: if the self object is not a PCollection that is the result of - a ParDo transform. - ValueError: if main_kw contains any key other than 'main'. - """ - main_tag = main_kw.pop('main', None) - if main_kw: - raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys()) - return _MultiParDo(self, tags, main_tag) - - -class _MultiParDo(PTransform): - - def __init__(self, do_transform, tags, main_tag): - super(_MultiParDo, self).__init__(do_transform.label) - self._do_transform = do_transform - self._tags = tags - self._main_tag = main_tag - - def apply(self, pcoll): - _ = pcoll | self._do_transform - return pvalue.DoOutputsTuple( - pcoll.pipeline, self._do_transform, self._tags, self._main_tag) - - -def FlatMap(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name - """FlatMap is like ParDo except it takes a callable to specify the - transformation. - - The callable must return an iterable for each element of the input - PCollection. The elements of these iterables will be flattened into - the output PCollection. - - Args: - fn_or_label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - *args: positional arguments passed to the transform callable. - **kwargs: keyword arguments passed to the transform callable. - - Returns: - A PCollection containing the Map outputs. - - Raises: - TypeError: If the fn passed as argument is not a callable. Typical error - is to pass a DoFn instance which is supported only for ParDo. - """ - if fn_or_label is None or isinstance(fn_or_label, str): - label, fn, args = fn_or_label, args[0], args[1:] - else: - label, fn = None, fn_or_label - if not callable(fn): - raise TypeError( - 'FlatMap can be used only with callable objects. ' - 'Received %r instead for %s argument.' - % (fn, 'first' if label is None else 'second')) - - if label is None: - label = 'FlatMap(%s)' % ptransform.label_from_callable(fn) - - return ParDo(label, CallableWrapperDoFn(fn), *args, **kwargs) - - -def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name - """Map is like FlatMap except its callable returns only a single element. - - Args: - fn_or_label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - *args: positional arguments passed to the transform callable. - **kwargs: keyword arguments passed to the transform callable. - - Returns: - A PCollection containing the Map outputs. - - Raises: - TypeError: If the fn passed as argument is not a callable. Typical error - is to pass a DoFn instance which is supported only for ParDo. - """ - if isinstance(fn_or_label, str): - label, fn, args = fn_or_label, args[0], args[1:] - else: - label, fn = None, fn_or_label - if not callable(fn): - raise TypeError( - 'Map can be used only with callable objects. ' - 'Received %r instead for %s argument.' - % (fn, 'first' if label is None else 'second')) - wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] - - # Proxy the type-hint information from the original function to this new - # wrapped function. - get_type_hints(wrapper).input_types = get_type_hints(fn).input_types - output_hint = get_type_hints(fn).simple_output_type(label) - if output_hint: - get_type_hints(wrapper).set_output_types(typehints.Iterable[output_hint]) - # pylint: disable=protected-access - wrapper._argspec_fn = fn - # pylint: enable=protected-access - - if label is None: - label = 'Map(%s)' % ptransform.label_from_callable(fn) - - return FlatMap(label, wrapper, *args, **kwargs) - - -def Filter(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name - """Filter is a FlatMap with its callable filtering out elements. - - Args: - fn_or_label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - *args: positional arguments passed to the transform callable. - **kwargs: keyword arguments passed to the transform callable. - - Returns: - A PCollection containing the Filter outputs. - - Raises: - TypeError: If the fn passed as argument is not a callable. Typical error - is to pass a DoFn instance which is supported only for FlatMap. - """ - if isinstance(fn_or_label, str): - label, fn, args = fn_or_label, args[0], args[1:] - else: - label, fn = None, fn_or_label - if not callable(fn): - raise TypeError( - 'Filter can be used only with callable objects. ' - 'Received %r instead for %s argument.' - % (fn, 'first' if label is None else 'second')) - wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else [] - - # Proxy the type-hint information from the function being wrapped, setting the - # output type to be the same as the input type. - get_type_hints(wrapper).input_types = get_type_hints(fn).input_types - output_hint = get_type_hints(fn).simple_output_type(label) - if (output_hint is None - and get_type_hints(wrapper).input_types - and get_type_hints(wrapper).input_types[0]): - output_hint = get_type_hints(wrapper).input_types[0] - if output_hint: - get_type_hints(wrapper).set_output_types(typehints.Iterable[output_hint]) - # pylint: disable=protected-access - wrapper._argspec_fn = fn - # pylint: enable=protected-access - - if label is None: - label = 'Filter(%s)' % ptransform.label_from_callable(fn) - - return FlatMap(label, wrapper, *args, **kwargs) - - -class CombineGlobally(PTransform): - """A CombineGlobally transform. - - Reduces a PCollection to a single value by progressively applying a CombineFn - to portions of the PCollection (and to intermediate values created thereby). - See documentation in CombineFn for details on the specifics on how CombineFns - are applied. - - Args: - label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - pcoll: a PCollection to be reduced into a single value. - fn: a CombineFn object that will be called to progressively reduce the - PCollection into single values, or a callable suitable for wrapping - by CallableWrapperCombineFn. - *args: positional arguments passed to the CombineFn object. - **kwargs: keyword arguments passed to the CombineFn object. - - Raises: - TypeError: If the output type of the input PCollection is not compatible - with Iterable[A]. - - Returns: - A single-element PCollection containing the main output of the Combine - transform. - - Note that the positional and keyword arguments will be processed in order - to detect PObjects that will be computed as side inputs to the transform. - During pipeline execution whenever the CombineFn object gets executed (i.e., - any of the CombineFn methods get called), the PObject arguments will be - replaced by their actual value in the exact position where they appear in - the argument lists. - """ - has_defaults = True - as_view = False - - def __init__(self, label_or_fn, *args, **kwargs): - if label_or_fn is None or isinstance(label_or_fn, str): - label, fn, args = label_or_fn, args[0], args[1:] - else: - label, fn = None, label_or_fn - - super(CombineGlobally, self).__init__(label) - self.fn = fn - self.args = args - self.kwargs = kwargs - - def default_label(self): - return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn) - - def clone(self, **extra_attributes): - clone = copy.copy(self) - clone.__dict__.update(extra_attributes) - return clone - - def with_defaults(self, has_defaults=True): - return self.clone(has_defaults=has_defaults) - - def without_defaults(self): - return self.with_defaults(False) - - def as_singleton_view(self): - return self.clone(as_view=True) - - def apply(self, pcoll): - def add_input_types(transform): - type_hints = self.get_type_hints() - if type_hints.input_types: - return transform.with_input_types(type_hints.input_types[0][0]) - else: - return transform - - combined = (pcoll - | add_input_types(Map('KeyWithVoid', lambda v: (None, v)) - .with_output_types(KV[None, pcoll.element_type])) - | CombinePerKey('CombinePerKey', self.fn, *self.args, **self.kwargs) - | Map('UnKey', lambda (k, v): v)) - - if not self.has_defaults and not self.as_view: - return combined - - if self.has_defaults: - combine_fn = ( - self.fn if isinstance(self.fn, CombineFn) - else CombineFn.from_callable(self.fn)) - default_value = combine_fn.apply([], *self.args, **self.kwargs) - else: - default_value = pvalue._SINGLETON_NO_DEFAULT # pylint: disable=protected-access - view = pvalue.AsSingleton(combined, default_value=default_value) - if self.as_view: - return view - else: - if pcoll.windowing.windowfn != window.GlobalWindows(): - raise ValueError( - "Default values are not yet supported in CombineGlobally() if the " - "output PCollection is not windowed by GlobalWindows. " - "Instead, use CombineGlobally().without_defaults() to output " - "an empty PCollection if the input PCollection is empty, " - "or CombineGlobally().as_singleton_view() to get the default " - "output of the CombineFn if the input PCollection is empty.") - def typed(transform): - # TODO(robertwb): We should infer this. - if combined.element_type: - return transform.with_output_types(combined.element_type) - else: - return transform - return (pcoll.pipeline - | Create('DoOnce', [None]) - | typed(Map('InjectDefault', lambda _, s: s, view))) - - -@ptransform_fn -def CombinePerKey(label, pcoll, fn, *args, **kwargs): # pylint: disable=invalid-name - """A per-key Combine transform. - - Identifies sets of values associated with the same key in the input - PCollection, then applies a CombineFn to condense those sets to single - values. See documentation in CombineFn for details on the specifics on how - CombineFns are applied. - - Args: - label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - pcoll: input pcollection. - fn: instance of CombineFn to apply to all values under the same key in - pcoll, or a callable whose signature is f(iterable, *args, **kwargs) - (e.g., sum, max). - *args: arguments and side inputs, passed directly to the CombineFn. - **kwargs: arguments and side inputs, passed directly to the CombineFn. - - Returns: - A PObject holding the result of the combine operation. - """ - return pcoll | GroupByKey() | CombineValues('Combine', fn, *args, **kwargs) - - -# TODO(robertwb): Rename to CombineGroupedValues? -class CombineValues(PTransformWithSideInputs): - - def make_fn(self, fn): - return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn) - - def apply(self, pcoll): - args, kwargs = util.insert_values_in_args( - self.args, self.kwargs, self.side_inputs) - - input_type = pcoll.element_type - key_type = None - if input_type is not None: - key_type, _ = input_type.tuple_types - - runtime_type_check = ( - pcoll.pipeline.options is not None and - pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check) - return pcoll | ParDo( - CombineValuesDoFn(key_type, self.fn, runtime_type_check), - *args, **kwargs) - - -class CombineValuesDoFn(DoFn): - """DoFn for performing per-key Combine transforms.""" - - def __init__(self, input_pcoll_type, combinefn, runtime_type_check): - super(CombineValuesDoFn, self).__init__() - self.combinefn = combinefn - self.runtime_type_check = runtime_type_check - - def process(self, p_context, *args, **kwargs): - # Expected elements input to this DoFn are 2-tuples of the form - # (key, iter), with iter an iterable of all the values associated with key - # in the input PCollection. - if self.runtime_type_check: - # Apply the combiner in a single operation rather than artificially - # breaking it up so that output type violations manifest as TypeCheck - # errors rather than type errors. - return [ - (p_context.element[0], - self.combinefn.apply(p_context.element[1], *args, **kwargs))] - else: - # Add the elements into three accumulators (for testing of merge). - elements = p_context.element[1] - accumulators = [] - for k in range(3): - if len(elements) <= k: - break - accumulators.append( - self.combinefn.add_inputs( - self.combinefn.create_accumulator(*args, **kwargs), - elements[k::3], - *args, **kwargs)) - # Merge the accumulators. - accumulator = self.combinefn.merge_accumulators( - accumulators, *args, **kwargs) - # Convert accumulator to the final result. - return [(p_context.element[0], - self.combinefn.extract_output(accumulator, *args, **kwargs))] - - def default_type_hints(self): - hints = self.combinefn.get_type_hints().copy() - if hints.input_types: - K = typehints.TypeVariable('K') - args, kwargs = hints.input_types - args = (typehints.Tuple[K, typehints.Iterable[args[0]]],) + args[1:] - hints.set_input_types(*args, **kwargs) - else: - K = typehints.Any - if hints.output_types: - main_output_type = hints.simple_output_type('') - hints.set_output_types(typehints.Tuple[K, main_output_type]) - return hints - - -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') -@typehints.with_input_types(typehints.KV[K, V]) -@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) -class GroupByKey(PTransform): - """A group by key transform. - - Processes an input PCollection consisting of key/value pairs represented as a - tuple pair. The result is a PCollection where values having a common key are - grouped together. For example (a, 1), (b, 2), (a, 3) will result into - (a, [1, 3]), (b, [2]). - - The implementation here is used only when run on the local direct runner. - """ - - class ReifyWindows(DoFn): - - def process(self, context): - try: - k, v = context.element - except TypeError: - raise TypeCheckError('Input to GroupByKey must be a PCollection with ' - 'elements compatible with KV[A, B]') - - return [(k, window.WindowedValue(v, context.timestamp, context.windows))] - - def infer_output_type(self, input_type): - key_type, value_type = trivial_inference.key_value_types(input_type) - return Iterable[KV[key_type, typehints.WindowedValue[value_type]]] - - class GroupAlsoByWindow(DoFn): - # TODO(robertwb): Support combiner lifting. - - def __init__(self, windowing): - super(GroupByKey.GroupAlsoByWindow, self).__init__() - self.windowing = windowing - - def infer_output_type(self, input_type): - key_type, windowed_value_iter_type = trivial_inference.key_value_types( - input_type) - value_type = windowed_value_iter_type.inner_type.inner_type - return Iterable[KV[key_type, Iterable[value_type]]] - - def process(self, context): - k, vs = context.element - # pylint: disable=g-import-not-at-top - from google.cloud.dataflow.transforms.trigger import InMemoryUnmergedState - from google.cloud.dataflow.transforms.trigger import create_trigger_driver - # pylint: enable=g-import-not-at-top - driver = create_trigger_driver(self.windowing, True) - state = InMemoryUnmergedState() - # TODO(robertwb): Conditionally process in smaller chunks. - for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP): - yield wvalue.with_value((k, wvalue.value)) - while state.timers: - fired = state.get_and_clear_timers() - for timer_window, (name, time_domain, fire_time) in fired: - for wvalue in driver.process_timer( - timer_window, name, time_domain, fire_time, state): - yield wvalue.with_value((k, wvalue.value)) - - def apply(self, pcoll): - # This code path is only used in the local direct runner. For Dataflow - # runner execution, the GroupByKey transform is expanded on the service. - input_type = pcoll.element_type - - if input_type is not None: - # Initialize type-hints used below to enforce type-checking and to pass - # downstream to further PTransforms. - key_type, value_type = trivial_inference.key_value_types(input_type) - typecoders.registry.verify_deterministic( - typecoders.registry.get_coder(key_type), - 'GroupByKey operation "%s"' % self.label) - - reify_output_type = KV[key_type, typehints.WindowedValue[value_type]] - gbk_input_type = KV[key_type, Iterable[typehints.WindowedValue[value_type]]] - gbk_output_type = KV[key_type, Iterable[value_type]] - - return (pcoll - | (ParDo('reify_windows', self.ReifyWindows()) - .with_output_types(reify_output_type)) - | (GroupByKeyOnly('group_by_key') - .with_input_types(reify_output_type) - .with_output_types(gbk_input_type)) - | (ParDo('group_by_window', - self.GroupAlsoByWindow(pcoll.windowing)) - .with_input_types(gbk_input_type) - .with_output_types(gbk_output_type))) - else: - return (pcoll - | ParDo('reify_windows', self.ReifyWindows()) - | GroupByKeyOnly('group_by_key') - | ParDo('group_by_window', - self.GroupAlsoByWindow(pcoll.windowing))) - - -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') -@typehints.with_input_types(typehints.KV[K, V]) -@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) -class GroupByKeyOnly(PTransform): - """A group by key transform, ignoring windows.""" - - def __init__(self, label=None): - super(GroupByKeyOnly, self).__init__(label) - - def infer_output_type(self, input_type): - key_type, value_type = trivial_inference.key_value_types(input_type) - return KV[key_type, Iterable[value_type]] - - def apply(self, pcoll): - self._check_pcollection(pcoll) - return pvalue.PCollection(pcoll.pipeline) - - -class Partition(PTransformWithSideInputs): - """Split a PCollection into several partitions. - - Uses the specified PartitionFn to separate an input PCollection into the - specified number of sub-PCollections. - - When apply()d, a Partition() PTransform requires the following: - - Args: - partitionfn: a PartitionFn, or a callable with the signature described in - CallableWrapperPartitionFn. - n: number of output partitions. - - The result of this PTransform is a simple list of the output PCollections - representing each of n partitions, in order. - """ - - class ApplyPartitionFnFn(DoFn): - """A DoFn that applies a PartitionFn.""" - - def process(self, context, partitionfn, n, *args, **kwargs): - partition = partitionfn.partition_for(context, n, *args, **kwargs) - if not 0 <= partition < n: - raise ValueError( - 'PartitionFn specified out-of-bounds partition index: ' - '%d not in [0, %d)' % (partition, n)) - # Each input is directed into the side output that corresponds to the - # selected partition. - yield pvalue.SideOutputValue(str(partition), context.element) - - def make_fn(self, fn): - return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) - - def apply(self, pcoll): - n = int(self.args[0]) - return pcoll | ParDo( - self.ApplyPartitionFnFn(), self.fn, *self.args, - **self.kwargs).with_outputs(*[str(t) for t in range(n)]) - - -class Windowing(object): - - def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, - output_time_fn=None): - global AccumulationMode, DefaultTrigger - # pylint: disable=g-import-not-at-top - from google.cloud.dataflow.transforms.trigger import AccumulationMode, DefaultTrigger - # pylint: enable=g-import-not-at-top - if triggerfn is None: - triggerfn = DefaultTrigger() - if accumulation_mode is None: - if triggerfn == DefaultTrigger(): - accumulation_mode = AccumulationMode.DISCARDING - else: - raise ValueError( - 'accumulation_mode must be provided for non-trivial triggers') - self.windowfn = windowfn - self.triggerfn = triggerfn - self.accumulation_mode = accumulation_mode - self.output_time_fn = output_time_fn or OutputTimeFn.OUTPUT_AT_EOW - self._is_default = ( - self.windowfn == window.GlobalWindows() and - self.triggerfn == DefaultTrigger() and - self.accumulation_mode == AccumulationMode.DISCARDING and - self.output_time_fn == OutputTimeFn.OUTPUT_AT_EOW) - - def __repr__(self): - return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn, - self.accumulation_mode, - self.output_time_fn) - - def is_default(self): - return self._is_default - - -T = typehints.TypeVariable('T') -@typehints.with_input_types(T) -@typehints.with_output_types(T) -class WindowInto(ParDo): # pylint: disable=g-wrong-blank-lines - """A window transform assigning windows to each element of a PCollection. - - Transforms an input PCollection by applying a windowing function to each - element. Each transformed element in the result will be a WindowedValue - element with the same input value and timestamp, with its new set of windows - determined by the windowing function. - """ - - class WindowIntoFn(DoFn): - """A DoFn that applies a WindowInto operation.""" - - def __init__(self, windowing): - self.windowing = windowing - - def process(self, context): - context = WindowFn.AssignContext(context.timestamp, - element=context.element, - existing_windows=context.windows) - new_windows = self.windowing.windowfn.assign(context) - yield WindowedValue(context.element, context.timestamp, new_windows) - - def __init__(self, *args, **kwargs): - """Initializes a WindowInto transform. - - Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, windowfn) or (windowfn). - The optional trigger and accumulation_mode kwargs may also be provided. - """ - triggerfn = kwargs.pop('trigger', None) - accumulation_mode = kwargs.pop('accumulation_mode', None) - output_time_fn = kwargs.pop('output_time_fn', None) - label, windowfn = self.parse_label_and_arg(args, kwargs, 'windowfn') - self.windowing = Windowing(windowfn, triggerfn, accumulation_mode, - output_time_fn) - dofn = self.WindowIntoFn(self.windowing) - super(WindowInto, self).__init__(label, dofn) - - def get_windowing(self, unused_inputs): - return self.windowing - - def infer_output_type(self, input_type): - return input_type - - def apply(self, pcoll): - input_type = pcoll.element_type - - if input_type is not None: - output_type = input_type - self.with_input_types(input_type) - self.with_output_types(output_type) - return super(WindowInto, self).apply(pcoll) - - -# Python's pickling is broken for nested classes. -WindowIntoFn = WindowInto.WindowIntoFn - - -class Flatten(PTransform): - """Merges several PCollections into a single PCollection. - - Copies all elements in 0 or more PCollections into a single output - PCollection. If there are no input PCollections, the resulting PCollection - will be empty (but see also kwargs below). - - Args: - label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - **kwargs: Accepts a single named argument "pipeline", which specifies the - pipeline that "owns" this PTransform. Ordinarily Flatten can obtain this - information from one of the input PCollections, but if there are none (or - if there's a chance there may be none), this argument is the only way to - provide pipeline information and should be considered mandatory. - """ - - def __init__(self, label=None, **kwargs): - super(Flatten, self).__init__(label) - self.pipeline = kwargs.pop('pipeline', None) - if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) - - def _extract_input_pvalues(self, pvalueish): - try: - pvalueish = tuple(pvalueish) - except TypeError: - raise ValueError('Input to Flatten must be an iterable.') - return pvalueish, pvalueish - - def apply(self, pcolls): - for pcoll in pcolls: - self._check_pcollection(pcoll) - return pvalue.PCollection(self.pipeline) - - def get_windowing(self, inputs): - if not inputs: - # TODO(robertwb): Return something compatible with every windowing? - return Windowing(window.GlobalWindows()) - else: - return super(Flatten, self).get_windowing(inputs) - - -class Create(PTransform): - """A transform that creates a PCollection from an iterable.""" - - def __init__(self, *args, **kwargs): - """Initializes a Create transform. - - Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, value) or (value). - """ - label, value = self.parse_label_and_arg(args, kwargs, 'value') - super(Create, self).__init__(label) - if isinstance(value, basestring): - raise TypeError('PTransform Create: Refusing to treat string as ' - 'an iterable. (string=%r)' % value) - elif isinstance(value, dict): - value = value.items() - self.value = tuple(value) - - def infer_output_type(self, unused_input_type): - if not self.value: - return Any - else: - return Union[[trivial_inference.instance_to_type(v) for v in self.value]] - - def apply(self, pbegin): - assert isinstance(pbegin, pvalue.PBegin) - self.pipeline = pbegin.pipeline - return pvalue.PCollection(self.pipeline) - - def get_windowing(self, unused_inputs): - return Windowing(window.GlobalWindows()) - - -def Read(*args, **kwargs): - from google.cloud.dataflow import io - return io.Read(*args, **kwargs) - - -def Write(*args, **kwargs): - from google.cloud.dataflow import io - return io.Write(*args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd b/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd deleted file mode 100644 index d0ab833..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cimport cython -from libc.stdint cimport int64_t, INT64_MIN, INT64_MAX - -cdef double _NEG_INF, _POS_INF, _NAN - - -cdef class CountAccumulator(object): - cdef readonly int64_t value - cpdef add_input(self, unused_element) - @cython.locals(accumulator=CountAccumulator) - cpdef merge(self, accumulators) - -cdef class SumInt64Accumulator(object): - cdef readonly int64_t value - cpdef add_input(self, int64_t element) - @cython.locals(accumulator=SumInt64Accumulator) - cpdef merge(self, accumulators) - -cdef class MinInt64Accumulator(object): - cdef readonly int64_t value - cpdef add_input(self, int64_t element) - @cython.locals(accumulator=MinInt64Accumulator) - cpdef merge(self, accumulators) - -cdef class MaxInt64Accumulator(object): - cdef readonly int64_t value - cpdef add_input(self, int64_t element) - @cython.locals(accumulator=MaxInt64Accumulator) - cpdef merge(self, accumulators) - -cdef class MeanInt64Accumulator(object): - cdef readonly int64_t sum - cdef readonly int64_t count - cpdef add_input(self, int64_t element) - @cython.locals(accumulator=MeanInt64Accumulator) - cpdef merge(self, accumulators) - - -cdef class SumDoubleAccumulator(object): - cdef readonly double value - cpdef add_input(self, double element) - @cython.locals(accumulator=SumDoubleAccumulator) - cpdef merge(self, accumulators) - -cdef class MinDoubleAccumulator(object): - cdef readonly double value - cpdef add_input(self, double element) - @cython.locals(accumulator=MinDoubleAccumulator) - cpdef merge(self, accumulators) - -cdef class MaxDoubleAccumulator(object): - cdef readonly double value - cpdef add_input(self, double element) - @cython.locals(accumulator=MaxDoubleAccumulator) - cpdef merge(self, accumulators) - -cdef class MeanDoubleAccumulator(object): - cdef readonly double sum - cdef readonly int64_t count - cpdef add_input(self, double element) - @cython.locals(accumulator=MeanDoubleAccumulator) - cpdef merge(self, accumulators) - - -cdef class AllAccumulator(object): - cdef readonly bint value - cpdef add_input(self, bint element) - @cython.locals(accumulator=AllAccumulator) - cpdef merge(self, accumulators) - -cdef class AnyAccumulator(object): - cdef readonly bint value - cpdef add_input(self, bint element) - @cython.locals(accumulator=AnyAccumulator) - cpdef merge(self, accumulators) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py b/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py deleted file mode 100644 index 4cc4233..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py +++ /dev/null @@ -1,250 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A library of basic cythonized CombineFn subclasses.""" - -from __future__ import absolute_import - -from google.cloud.dataflow.transforms import core - - -class AccumulatorCombineFn(core.CombineFn): - # singleton? - def create_accumulator(self): - return self._accumulator_type() - @staticmethod - def add_input(accumulator, element): - accumulator.add_input(element) - return accumulator - def merge_accumulators(self, accumulators): - accumulator = self._accumulator_type() - accumulator.merge(accumulators) - return accumulator - @staticmethod - def extract_output(accumulator): - return accumulator.extract_output() - def __eq__(self, other): - return (isinstance(other, AccumulatorCombineFn) - and self._accumulator_type is other._accumulator_type) - def __hash__(self): - return hash(self._accumulator_type) - - -_63 = 63 # Avoid large literals in C source code. -globals()['INT64_MAX'] = 2**_63 - 1 -globals()['INT64_MIN'] = -2**_63 - - -class CountAccumulator(object): - def __init__(self): - self.value = 0 - def add_input(self, unused_element): - self.value += 1 - def merge(self, accumulators): - for accumulator in accumulators: - self.value += accumulator.value - def extract_output(self): - return self.value - - -class SumInt64Accumulator(object): - def __init__(self): - self.value = 0 - def add_input(self, element): - element = int(element) - if not INT64_MIN <= element <= INT64_MAX: - raise OverflowError(element) - self.value += element - def merge(self, accumulators): - for accumulator in accumulators: - self.value += accumulator.value - def extract_output(self): - if not INT64_MIN <= self.value <= INT64_MAX: - self.value %= 2**64 - if self.value >= INT64_MAX: - self.value -= 2**64 - return self.value - - -class MinInt64Accumulator(object): - def __init__(self): - self.value = INT64_MAX - def add_input(self, element): - element = int(element) - if not INT64_MIN <= element <= INT64_MAX: - raise OverflowError(element) - if element < self.value: - self.value = element - def merge(self, accumulators): - for accumulator in accumulators: - if accumulator.value < self.value: - self.value = accumulator.value - def extract_output(self): - return self.value - - -class MaxInt64Accumulator(object): - def __init__(self): - self.value = INT64_MIN - def add_input(self, element): - element = int(element) - if not INT64_MIN <= element <= INT64_MAX: - raise OverflowError(element) - if element > self.value: - self.value = element - def merge(self, accumulators): - for accumulator in accumulators: - if accumulator.value > self.value: - self.value = accumulator.value - def extract_output(self): - return self.value - - -class MeanInt64Accumulator(object): - def __init__(self): - self.sum = 0 - self.count = 0 - def add_input(self, element): - element = int(element) - if not INT64_MIN <= element <= INT64_MAX: - raise OverflowError(element) - self.sum += element - self.count += 1 - def merge(self, accumulators): - for accumulator in accumulators: - self.sum += accumulator.sum - self.count += accumulator.count - def extract_output(self): - if not INT64_MIN <= self.sum <= INT64_MAX: - self.sum %= 2**64 - if self.sum >= INT64_MAX: - self.sum -= 2**64 - return self.sum / self.count if self.count else _NAN - - -class CountCombineFn(AccumulatorCombineFn): - _accumulator_type = CountAccumulator -class SumInt64Fn(AccumulatorCombineFn): - _accumulator_type = SumInt64Accumulator -class MinInt64Fn(AccumulatorCombineFn): - _accumulator_type = MinInt64Accumulator -class MaxInt64Fn(AccumulatorCombineFn): - _accumulator_type = MaxInt64Accumulator -class MeanInt64Fn(AccumulatorCombineFn): - _accumulator_type = MeanInt64Accumulator - - -_POS_INF = float('inf') -_NEG_INF = float('-inf') -_NAN = float('nan') - - -class SumDoubleAccumulator(object): - def __init__(self): - self.value = 0 - def add_input(self, element): - element = float(element) - self.value += element - def merge(self, accumulators): - for accumulator in accumulators: - self.value += accumulator.value - def extract_output(self): - return self.value - - -class MinDoubleAccumulator(object): - def __init__(self): - self.value = _POS_INF - def add_input(self, element): - element = float(element) - if element < self.value: - self.value = element - def merge(self, accumulators): - for accumulator in accumulators: - if accumulator.value < self.value: - self.value = accumulator.value - def extract_output(self): - return self.value - - -class MaxDoubleAccumulator(object): - def __init__(self): - self.value = _NEG_INF - def add_input(self, element): - element = float(element) - if element > self.value: - self.value = element - def merge(self, accumulators): - for accumulator in accumulators: - if accumulator.value > self.value: - self.value = accumulator.value - def extract_output(self): - return self.value - - -class MeanDoubleAccumulator(object): - def __init__(self): - self.sum = 0 - self.count = 0 - def add_input(self, element): - element = float(element) - self.sum += element - self.count += 1 - def merge(self, accumulators): - for accumulator in accumulators: - self.sum += accumulator.sum - self.count += accumulator.count - def extract_output(self): - return self.sum / self.count if self.count else _NAN - - -class SumFloatFn(AccumulatorCombineFn): - _accumulator_type = SumDoubleAccumulator -class MinFloatFn(AccumulatorCombineFn): - _accumulator_type = MinDoubleAccumulator -class MaxFloatFn(AccumulatorCombineFn): - _accumulator_type = MaxDoubleAccumulator -class MeanFloatFn(AccumulatorCombineFn): - _accumulator_type = MeanDoubleAccumulator - - -class AllAccumulator(object): - def __init__(self): - self.value = True - def add_input(self, element): - self.value &= not not element - def merge(self, accumulators): - for accumulator in accumulators: - self.value &= accumulator.value - def extract_output(self): - return self.value - - -class AnyAccumulator(object): - def __init__(self): - self.value = False - def add_input(self, element): - self.value |= not not element - def merge(self, accumulators): - for accumulator in accumulators: - self.value |= accumulator.value - def extract_output(self): - return self.value - - -class AnyCombineFn(AccumulatorCombineFn): - _accumulator_type = AnyAccumulator - -class AllCombineFn(AccumulatorCombineFn): - _accumulator_type = AllAccumulator