Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E90CE200B4A for ; Wed, 15 Jun 2016 01:12:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E797C160A5F; Tue, 14 Jun 2016 23:12:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41BE0160A64 for ; Wed, 15 Jun 2016 01:12:55 +0200 (CEST) Received: (qmail 71519 invoked by uid 500); 14 Jun 2016 23:12:54 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 71461 invoked by uid 99); 14 Jun 2016 23:12:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 23:12:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id CAD541A00E1 for ; Tue, 14 Jun 2016 23:12:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id eSDbpVgWnHXe for ; Tue, 14 Jun 2016 23:12:41 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 239DE60D14 for ; Tue, 14 Jun 2016 23:12:37 +0000 (UTC) Received: (qmail 70040 invoked by uid 99); 14 Jun 2016 23:12:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 23:12:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14866E1810; Tue, 14 Jun 2016 23:12:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Tue, 14 Jun 2016 23:12:40 -0000 Message-Id: In-Reply-To: <95df9c9428334e3980c0c77c4ddc9382@git.apache.org> References: <95df9c9428334e3980c0c77c4ddc9382@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder archived-at: Tue, 14 Jun 2016 23:12:57 -0000 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/timeutil.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/timeutil.py b/sdks/python/google/cloud/dataflow/transforms/timeutil.py deleted file mode 100644 index 7b750f9..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/timeutil.py +++ /dev/null @@ -1,310 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Time and timer utilities.""" - -from __future__ import absolute_import - -from abc import ABCMeta -from abc import abstractmethod - -import datetime -import sys - - -class Timestamp(object): - """Represents a Unix second timestamp with microsecond granularity. - - Can be treated in common timestamp arithmetic operations as a numeric type. - - Internally stores a time interval as an int of microseconds. This strategy - is necessary since floating point values lose precision when storing values, - especially after arithmetic operations (for example, 10000000 % 0.1 evaluates - to 0.0999999994448885). - """ - - def __init__(self, seconds=0, micros=0): - self.micros = int(seconds * 1000000) + int(micros) - - @staticmethod - def of(seconds): - """Return the Timestamp for the given number of seconds. - - If the input is already a Timestamp, the input itself will be returned. - - Args: - seconds: Number of seconds as int, float or Timestamp. - - Returns: - Corresponding Timestamp object. - """ - - if isinstance(seconds, Duration): - raise TypeError('Can\'t interpret %s as Timestamp.' % seconds) - if isinstance(seconds, Timestamp): - return seconds - return Timestamp(seconds) - - def __repr__(self): - micros = self.micros - sign = '' - if micros < 0: - sign = '-' - micros = -micros - int_part = micros / 1000000 - frac_part = micros % 1000000 - if frac_part: - return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part) - else: - return 'Timestamp(%s%d)' % (sign, int_part) - - def to_utc_datetime(self): - epoch = datetime.datetime.utcfromtimestamp(0) - # We can't easily construct a datetime object from microseconds, so we - # create one at the epoch and add an appropriate timedelta interval. - return epoch + datetime.timedelta(microseconds=self.micros) - - def isoformat(self): - # Append 'Z' for UTC timezone. - return self.to_utc_datetime().isoformat() + 'Z' - - def __float__(self): - # Note that the returned value may have lost precision. - return float(self.micros) / 1000000 - - def __int__(self): - # Note that the returned value may have lost precision. - return self.micros / 1000000 - - def __cmp__(self, other): - # Allow comparisons between Duration and Timestamp values. - if not isinstance(other, Duration): - other = Timestamp.of(other) - return cmp(self.micros, other.micros) - - def __hash__(self): - return hash(self.micros) - - def __add__(self, other): - other = Duration.of(other) - return Timestamp(micros=self.micros + other.micros) - - def __radd__(self, other): - return self + other - - def __sub__(self, other): - other = Duration.of(other) - return Timestamp(micros=self.micros - other.micros) - - def __mod__(self, other): - other = Duration.of(other) - return Duration(micros=self.micros % other.micros) - - -MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1) -MAX_TIMESTAMP = Timestamp(micros=sys.maxint) - - -class Duration(object): - """Represents a second duration with microsecond granularity. - - Can be treated in common arithmetic operations as a numeric type. - - Internally stores a time interval as an int of microseconds. This strategy - is necessary since floating point values lose precision when storing values, - especially after arithmetic operations (for example, 10000000 % 0.1 evaluates - to 0.0999999994448885). - """ - - def __init__(self, seconds=0, micros=0): - self.micros = int(seconds * 1000000) + int(micros) - - @staticmethod - def of(seconds): - """Return the Duration for the given number of seconds since Unix epoch. - - If the input is already a Duration, the input itself will be returned. - - Args: - seconds: Number of seconds as int, float or Duration. - - Returns: - Corresponding Duration object. - """ - - if isinstance(seconds, Timestamp): - raise TypeError('Can\'t interpret %s as Duration.' % seconds) - if isinstance(seconds, Duration): - return seconds - return Duration(seconds) - - def __repr__(self): - micros = self.micros - sign = '' - if micros < 0: - sign = '-' - micros = -micros - int_part = micros / 1000000 - frac_part = micros % 1000000 - if frac_part: - return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part) - else: - return 'Duration(%s%d)' % (sign, int_part) - - def __float__(self): - # Note that the returned value may have lost precision. - return float(self.micros) / 1000000 - - def __int__(self): - # Note that the returned value may have lost precision. - return self.micros / 1000000 - - def __cmp__(self, other): - # Allow comparisons between Duration and Timestamp values. - if not isinstance(other, Timestamp): - other = Duration.of(other) - return cmp(self.micros, other.micros) - - def __hash__(self): - return hash(self.micros) - - def __neg__(self): - return Duration(micros=-self.micros) - - def __add__(self, other): - if isinstance(other, Timestamp): - return other + self - other = Duration.of(other) - return Duration(micros=self.micros + other.micros) - - def __radd__(self, other): - return self + other - - def __sub__(self, other): - other = Duration.of(other) - return Duration(micros=self.micros - other.micros) - - def __rsub__(self, other): - return -(self - other) - - def __mul__(self, other): - other = Duration.of(other) - return Duration(micros=self.micros * other.micros / 1000000) - - def __rmul__(self, other): - return self * other - - def __mod__(self, other): - other = Duration.of(other) - return Duration(micros=self.micros % other.micros) - - -class TimeDomain(object): - """Time domain for streaming timers.""" - - WATERMARK = 'WATERMARK' - REAL_TIME = 'REAL_TIME' - DEPENDENT_REAL_TIME = 'DEPENDENT_REAL_TIME' - - @staticmethod - def from_string(domain): - if domain in (TimeDomain.WATERMARK, - TimeDomain.REAL_TIME, - TimeDomain.DEPENDENT_REAL_TIME): - return domain - raise ValueError('Unknown time domain: %s' % domain) - - -class OutputTimeFnImpl(object): - """Implementation of OutputTimeFn.""" - - __metaclass__ = ABCMeta - - @abstractmethod - def assign_output_time(self, window, input_timestamp): - pass - - @abstractmethod - def combine(self, output_timestamp, other_output_timestamp): - pass - - def combine_all(self, merging_timestamps): - """Apply combine to list of timestamps.""" - combined_output_time = None - for output_time in merging_timestamps: - if combined_output_time is None: - combined_output_time = output_time - else: - combined_output_time = self.combine( - combined_output_time, output_time) - return combined_output_time - - def merge(self, unused_result_window, merging_timestamps): - """Default to returning the result of combine_all.""" - return self.combine_all(merging_timestamps) - - -class DependsOnlyOnWindow(OutputTimeFnImpl): - """OutputTimeFnImpl that only depends on the window.""" - - __metaclass__ = ABCMeta - - def combine(self, output_timestamp, other_output_timestamp): - return output_timestamp - - def merge(self, result_window, unused_merging_timestamps): - # Since we know that the result only depends on the window, we can ignore - # the given timestamps. - return self.assign_output_time(result_window, None) - - -class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl): - """OutputTimeFnImpl outputting at earliest input timestamp.""" - - def assign_output_time(self, window, input_timestamp): - return input_timestamp - - def combine(self, output_timestamp, other_output_timestamp): - """Default to returning the earlier of two timestamps.""" - return min(output_timestamp, other_output_timestamp) - - -class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl): - """OutputTimeFnImpl outputting at earliest input timestamp.""" - - def __init__(self, window_fn): - self.window_fn = window_fn - - def assign_output_time(self, window, input_timestamp): - return self.window_fn.get_transformed_output_time(window, input_timestamp) - - def combine(self, output_timestamp, other_output_timestamp): - return min(output_timestamp, other_output_timestamp) - - -class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl): - """OutputTimeFnImpl outputting at latest input timestamp.""" - - def assign_output_time(self, window, input_timestamp): - return input_timestamp - - def combine(self, output_timestamp, other_output_timestamp): - return max(output_timestamp, other_output_timestamp) - - -class OutputAtEndOfWindowImpl(DependsOnlyOnWindow): - """OutputTimeFnImpl outputting at end of window.""" - - def assign_output_time(self, window, unused_input_timestamp): - return window.end http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py b/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py deleted file mode 100644 index 26ff3ae..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for time utilities.""" - -from __future__ import absolute_import - -import unittest - -from google.cloud.dataflow.transforms.timeutil import Duration -from google.cloud.dataflow.transforms.timeutil import Timestamp - - -class TimestampTest(unittest.TestCase): - - def test_of(self): - interval = Timestamp(123) - self.assertEqual(id(interval), id(Timestamp.of(interval))) - self.assertEqual(interval, Timestamp.of(123.0)) - with self.assertRaises(TypeError): - Timestamp.of(Duration(10)) - - def test_precision(self): - self.assertEqual(Timestamp(10000000) % 0.1, 0) - self.assertEqual(Timestamp(10000000) % 0.05, 0) - self.assertEqual(Timestamp(10000000) % 0.000005, 0) - self.assertEqual(Timestamp(10000000) % Duration(0.1), 0) - self.assertEqual(Timestamp(10000000) % Duration(0.05), 0) - self.assertEqual(Timestamp(10000000) % Duration(0.000005), 0) - - def test_utc_timestamp(self): - self.assertEqual(Timestamp(10000000).isoformat(), - '1970-04-26T17:46:40Z') - self.assertEqual(Timestamp(10000000.000001).isoformat(), - '1970-04-26T17:46:40.000001Z') - self.assertEqual(Timestamp(1458343379.123456).isoformat(), - '2016-03-18T23:22:59.123456Z') - - def test_arithmetic(self): - # Supported operations. - self.assertEqual(Timestamp(123) + 456, 579) - self.assertEqual(Timestamp(123) + Duration(456), 579) - self.assertEqual(456 + Timestamp(123), 579) - self.assertEqual(Duration(456) + Timestamp(123), 579) - self.assertEqual(Timestamp(123) - 456, -333) - self.assertEqual(Timestamp(123) - Duration(456), -333) - self.assertEqual(Timestamp(1230) % 456, 318) - self.assertEqual(Timestamp(1230) % Duration(456), 318) - - # Check that direct comparison of Timestamp and Duration is allowed. - self.assertTrue(Duration(123) == Timestamp(123)) - self.assertTrue(Timestamp(123) == Duration(123)) - self.assertFalse(Duration(123) == Timestamp(1230)) - self.assertFalse(Timestamp(123) == Duration(1230)) - - # Check return types. - self.assertEqual((Timestamp(123) + 456).__class__, Timestamp) - self.assertEqual((Timestamp(123) + Duration(456)).__class__, Timestamp) - self.assertEqual((456 + Timestamp(123)).__class__, Timestamp) - self.assertEqual((Duration(456) + Timestamp(123)).__class__, Timestamp) - self.assertEqual((Timestamp(123) - 456).__class__, Timestamp) - self.assertEqual((Timestamp(123) - Duration(456)).__class__, Timestamp) - self.assertEqual((Timestamp(1230) % 456).__class__, Duration) - self.assertEqual((Timestamp(1230) % Duration(456)).__class__, Duration) - - # Unsupported operations. - with self.assertRaises(TypeError): - self.assertEqual(Timestamp(123) * 456, 56088) - with self.assertRaises(TypeError): - self.assertEqual(Timestamp(123) * Duration(456), 56088) - with self.assertRaises(TypeError): - self.assertEqual(456 * Timestamp(123), 56088) - with self.assertRaises(TypeError): - self.assertEqual(Duration(456) * Timestamp(123), 56088) - with self.assertRaises(TypeError): - self.assertEqual(456 - Timestamp(123), 333) - with self.assertRaises(TypeError): - self.assertEqual(Duration(456) - Timestamp(123), 333) - with self.assertRaises(TypeError): - self.assertEqual(-Timestamp(123), -123) - with self.assertRaises(TypeError): - self.assertEqual(-Timestamp(123), -Duration(123)) - with self.assertRaises(TypeError): - self.assertEqual(1230 % Timestamp(456), 318) - with self.assertRaises(TypeError): - self.assertEqual(Duration(1230) % Timestamp(456), 318) - - def test_sort_order(self): - self.assertEqual( - [-63, Timestamp(-3), 2, 9, Timestamp(292.3), 500], - sorted([9, 2, Timestamp(-3), Timestamp(292.3), -63, 500])) - self.assertEqual( - [4, 5, Timestamp(6), Timestamp(7), 8, 9], - sorted([9, 8, Timestamp(7), Timestamp(6), 5, 4])) - - def test_str(self): - self.assertEqual('Timestamp(1.234567)', - str(Timestamp(1.234567))) - self.assertEqual('Timestamp(-1.234567)', - str(Timestamp(-1.234567))) - self.assertEqual('Timestamp(-999999999.900000)', - str(Timestamp(-999999999.9))) - self.assertEqual('Timestamp(999999999)', - str(Timestamp(999999999))) - self.assertEqual('Timestamp(-999999999)', - str(Timestamp(-999999999))) - - -class DurationTest(unittest.TestCase): - - def test_of(self): - interval = Duration(123) - self.assertEqual(id(interval), id(Duration.of(interval))) - self.assertEqual(interval, Duration.of(123.0)) - with self.assertRaises(TypeError): - Duration.of(Timestamp(10)) - - def test_precision(self): - self.assertEqual(Duration(10000000) % 0.1, 0) - self.assertEqual(Duration(10000000) % 0.05, 0) - self.assertEqual(Duration(10000000) % 0.000005, 0) - - def test_arithmetic(self): - self.assertEqual(Duration(123) + 456, 579) - self.assertEqual(456 + Duration(123), 579) - self.assertEqual(Duration(123) * 456, 56088) - self.assertEqual(456 * Duration(123), 56088) - self.assertEqual(Duration(123) - 456, -333) - self.assertEqual(456 - Duration(123), 333) - self.assertEqual(-Duration(123), -123) - - def test_sort_order(self): - self.assertEqual( - [-63, Duration(-3), 2, 9, Duration(292.3), 500], - sorted([9, 2, Duration(-3), Duration(292.3), -63, 500])) - self.assertEqual( - [4, 5, Duration(6), Duration(7), 8, 9], - sorted([9, 8, Duration(7), Duration(6), 5, 4])) - - def test_str(self): - self.assertEqual('Duration(1.234567)', - str(Duration(1.234567))) - self.assertEqual('Duration(-1.234567)', - str(Duration(-1.234567))) - self.assertEqual('Duration(-999999999.900000)', - str(Duration(-999999999.9))) - self.assertEqual('Duration(999999999)', - str(Duration(999999999))) - self.assertEqual('Duration(-999999999)', - str(Duration(-999999999))) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/trigger.py b/sdks/python/google/cloud/dataflow/transforms/trigger.py deleted file mode 100644 index 039847a..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/trigger.py +++ /dev/null @@ -1,958 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Support for Dataflow triggers. - -Triggers control when in processing time windows get emitted. -""" - -from abc import ABCMeta -from abc import abstractmethod -import collections -import copy - -from google.cloud.dataflow.coders import observable -from google.cloud.dataflow.transforms import combiners -from google.cloud.dataflow.transforms import core -from google.cloud.dataflow.transforms.timeutil import MAX_TIMESTAMP -from google.cloud.dataflow.transforms.timeutil import MIN_TIMESTAMP -from google.cloud.dataflow.transforms.timeutil import TimeDomain -from google.cloud.dataflow.transforms.window import GlobalWindow -from google.cloud.dataflow.transforms.window import OutputTimeFn -from google.cloud.dataflow.transforms.window import WindowedValue -from google.cloud.dataflow.transforms.window import WindowFn - - -class AccumulationMode(object): - """Controls what to do with data when a trigger fires multiple times. - """ - DISCARDING = 1 - ACCUMULATING = 2 - # TODO(robertwb): Provide retractions of previous outputs. - # RETRACTING = 3 - - -class StateTag(object): - """An identifier used to store and retrieve typed, combinable state. - - The given tag must be unique for this stage. If CombineFn is None then - all elements will be returned as a list, otherwise the given CombineFn - will be applied (possibly incrementally and eagerly) when adding elements. - """ - __metaclass__ = ABCMeta - - def __init__(self, tag): - self.tag = tag - - -class ValueStateTag(StateTag): - """StateTag pointing to an element.""" - - def __repr__(self): - return 'ValueStateTag(%s, %s)' % (self.tag, self.combine_fn) - - def with_prefix(self, prefix): - return ValueStateTag(prefix + self.tag) - - -class CombiningValueStateTag(StateTag): - """StateTag pointing to an element, accumulated with a combiner.""" - - # TODO(robertwb): Also store the coder (perhaps extracted from the combine_fn) - def __init__(self, tag, combine_fn): - super(CombiningValueStateTag, self).__init__(tag) - if not combine_fn: - raise ValueError('combine_fn must be specified.') - if not isinstance(combine_fn, core.CombineFn): - combine_fn = core.CombineFn.from_callable(combine_fn) - self.combine_fn = combine_fn - - def __repr__(self): - return 'CombiningValueStateTag(%s, %s)' % (self.tag, self.combine_fn) - - def with_prefix(self, prefix): - return CombiningValueStateTag(prefix + self.tag, self.combine_fn) - - -class ListStateTag(StateTag): - """StateTag pointing to a list of elements.""" - - def __init__(self, tag): - super(ListStateTag, self).__init__(tag) - - def __repr__(self): - return 'ListStateTag(%s)' % self.tag - - def with_prefix(self, prefix): - return ListStateTag(prefix + self.tag) - - -class WatermarkHoldStateTag(StateTag): - - def __init__(self, tag, output_time_fn_impl): - super(WatermarkHoldStateTag, self).__init__(tag) - self.output_time_fn_impl = output_time_fn_impl - - def __repr__(self): - return 'WatermarkHoldStateTag(%s, %s)' % (self.tag, - self.output_time_fn_impl) - - def with_prefix(self, prefix): - return WatermarkHoldStateTag(prefix + self.tag, - self.output_time_fn_impl) - - -# pylint: disable=unused-argument -# TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(object): - """A TriggerFn determines when window (panes) are emitted. - - See https://cloud.google.com/dataflow/model/triggers. - """ - __metaclass__ = ABCMeta - - @abstractmethod - def on_element(self, element, window, context): - """Called when a new element arrives in a window. - - Args: - element: the element being added - window: the window to which the element is being added - context: a context (e.g. a TriggerContext instance) for managing state - and setting timers - """ - pass - - @abstractmethod - def on_merge(self, to_be_merged, merge_result, context): - """Called when multiple windows are merged. - - Args: - to_be_merged: the set of windows to be merged - merge_result: the window into which the windows are being merged - context: a context (e.g. a TriggerContext instance) for managing state - and setting timers - """ - pass - - @abstractmethod - def should_fire(self, watermark, window, context): - """Whether this trigger should cause the window to fire. - - Args: - watermark: (a lower bound on) the watermark of the system - window: the window whose trigger is being considered - context: a context (e.g. a TriggerContext instance) for managing state - and setting timers - - Returns: - whether this trigger should cause a firing - """ - pass - - @abstractmethod - def on_fire(self, watermark, window, context): - """Called when a trigger actually fires. - - Args: - watermark: (a lower bound on) the watermark of the system - window: the window whose trigger is being fired - context: a context (e.g. a TriggerContext instance) for managing state - and setting timers - - Returns: - whether this trigger is finished - """ - pass - - @abstractmethod - def reset(self, window, context): - """Clear any state and timers used by this TriggerFn.""" - pass -# pylint: enable=unused-argument - - -class DefaultTrigger(TriggerFn): - """Semantically Repeatedly(AfterWatermark()), but more optimized.""" - - def __init__(self): - pass - - def __repr__(self): - return 'DefaultTrigger()' - - def on_element(self, element, window, context): - context.set_timer('', TimeDomain.WATERMARK, window.end) - - def on_merge(self, to_be_merged, merge_result, context): - # Note: Timer clearing solely an optimization. - for window in to_be_merged: - if window.end != merge_result.end: - context.clear_timer('', TimeDomain.WATERMARK) - - def should_fire(self, watermark, window, context): - return watermark >= window.end - - def on_fire(self, watermark, window, context): - return False - - def reset(self, window, context): - context.clear_timer('', TimeDomain.WATERMARK) - - def __eq__(self, other): - return type(self) == type(other) - - -class AfterWatermark(TriggerFn): - """Fire exactly once when the watermark passes the end of the window. - - Args: - early: if not None, a speculative trigger to repeatedly evaluate before - the watermark passes the end of the window - late: if not None, a speculative trigger to repeatedly evaluate after - the watermark passes the end of the window - """ - LATE_TAG = CombiningValueStateTag('is_late', any) - - def __init__(self, early=None, late=None): - self.early = Repeatedly(early) if early else None - self.late = Repeatedly(late) if late else None - - def __repr__(self): - qualifiers = [] - if self.early: - qualifiers.append('early=%s' % self.early) - if self.late: - qualifiers.append('late=%s', self.late) - return 'AfterWatermark(%s)' % ', '.join(qualifiers) - - def is_late(self, context): - return self.late and context.get_state(self.LATE_TAG) - - def on_element(self, element, window, context): - if self.is_late(context): - self.late.on_element(element, window, NestedContext(context, 'late')) - else: - context.set_timer('', TimeDomain.WATERMARK, window.end) - if self.early: - self.early.on_element(element, window, NestedContext(context, 'early')) - - def on_merge(self, to_be_merged, merge_result, context): - # TODO(robertwb): Figure out whether the 'rewind' semantics could be used - # here. - if self.is_late(context): - self.late.on_merge( - to_be_merged, merge_result, NestedContext(context, 'late')) - else: - # Note: Timer clearing solely an optimization. - for window in to_be_merged: - if window.end != merge_result.end: - context.clear_timer('', TimeDomain.WATERMARK) - if self.early: - self.early.on_merge( - to_be_merged, merge_result, NestedContext(context, 'early')) - - def should_fire(self, watermark, window, context): - if self.is_late(context): - return self.late.should_fire( - watermark, window, NestedContext(context, 'late')) - elif watermark >= window.end: - return True - elif self.early: - return self.early.should_fire( - watermark, window, NestedContext(context, 'early')) - else: - return False - - def on_fire(self, watermark, window, context): - if self.is_late(context): - return self.late.on_fire( - watermark, window, NestedContext(context, 'late')) - elif watermark >= window.end: - context.add_state(self.LATE_TAG, True) - return not self.late - elif self.early: - self.early.on_fire(watermark, window, NestedContext(context, 'early')) - return False - - def reset(self, window, context): - if self.late: - context.clear_state(self.LATE_TAG) - if self.early: - self.early.reset(window, NestedContext(context, 'early')) - if self.late: - self.late.reset(window, NestedContext(context, 'late')) - - def __eq__(self, other): - return (type(self) == type(other) - and self.early == other.early - and self.late == other.late) - - def __hash__(self): - return hash((type(self), self.early, self.late)) - - -class AfterCount(TriggerFn): - """Fire when there are at least count elements in this window pane.""" - - COUNT_TAG = CombiningValueStateTag('count', combiners.CountCombineFn()) - - def __init__(self, count): - self.count = count - - def __repr__(self): - return 'AfterCount(%s)' % self.count - - def on_element(self, element, window, context): - context.add_state(self.COUNT_TAG, 1) - - def on_merge(self, to_be_merged, merge_result, context): - # states automatically merged - pass - - def should_fire(self, watermark, window, context): - return context.get_state(self.COUNT_TAG) >= self.count - - def on_fire(self, watermark, window, context): - return True - - def reset(self, window, context): - context.clear_state(self.COUNT_TAG) - - -class Repeatedly(TriggerFn): - """Repeatedly invoke the given trigger, never finishing.""" - - def __init__(self, underlying): - self.underlying = underlying - - def __repr__(self): - return 'Repeatedly(%s)' % self.underlying - - def on_element(self, element, window, context): # get window from context? - self.underlying.on_element(element, window, context) - - def on_merge(self, to_be_merged, merge_result, context): - self.underlying.on_merge(to_be_merged, merge_result, context) - - def should_fire(self, watermark, window, context): - return self.underlying.should_fire(watermark, window, context) - - def on_fire(self, watermark, window, context): - if self.underlying.on_fire(watermark, window, context): - self.underlying.reset(window, context) - return False - - def reset(self, window, context): - self.underlying.reset(window, context) - - -class ParallelTriggerFn(TriggerFn): - - __metaclass__ = ABCMeta - - def __init__(self, *triggers): - self.triggers = triggers - - def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, - ', '.join(str(t) for t in self.triggers)) - - @abstractmethod - def combine_op(self, trigger_results): - pass - - def on_element(self, element, window, context): - for ix, trigger in enumerate(self.triggers): - trigger.on_element(element, window, self._sub_context(context, ix)) - - def on_merge(self, to_be_merged, merge_result, context): - for ix, trigger in enumerate(self.triggers): - trigger.on_merge( - to_be_merged, merge_result, self._sub_context(context, ix)) - - def should_fire(self, watermark, window, context): - return self.combine_op( - trigger.should_fire(watermark, window, self._sub_context(context, ix)) - for ix, trigger in enumerate(self.triggers)) - - def on_fire(self, watermark, window, context): - finished = [] - for ix, trigger in enumerate(self.triggers): - nested_context = self._sub_context(context, ix) - if trigger.should_fire(watermark, window, nested_context): - finished.append(trigger.on_fire(watermark, window, nested_context)) - return self.combine_op(finished) - - def reset(self, window, context): - for ix, trigger in enumerate(self.triggers): - trigger.reset(window, self._sub_context(context, ix)) - - @staticmethod - def _sub_context(context, index): - return NestedContext(context, '%d/' % index) - - -class AfterFirst(ParallelTriggerFn): - """Fires when any subtrigger fires. - - Also finishes when any subtrigger finishes. - """ - combine_op = any - - -class AfterAll(ParallelTriggerFn): - """Fires when all subtriggers have fired. - - Also finishes when all subtriggers have finished. - """ - combine_op = all - - -class AfterEach(TriggerFn): - - INDEX_TAG = CombiningValueStateTag('index', ( - lambda indices: 0 if not indices else max(indices))) - - def __init__(self, *triggers): - self.triggers = triggers - - def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, - ', '.join(str(t) for t in self.triggers)) - - def on_element(self, element, window, context): - ix = context.get_state(self.INDEX_TAG) - if ix < len(self.triggers): - self.triggers[ix].on_element( - element, window, self._sub_context(context, ix)) - - def on_merge(self, to_be_merged, merge_result, context): - # This takes the furthest window on merging. - # TODO(robertwb): Revisit this when merging windows logic is settled for - # all possible merging situations. - ix = context.get_state(self.INDEX_TAG) - if ix < len(self.triggers): - self.triggers[ix].on_merge( - to_be_merged, merge_result, self._sub_context(context, ix)) - - def should_fire(self, watermark, window, context): - ix = context.get_state(self.INDEX_TAG) - if ix < len(self.triggers): - return self.triggers[ix].should_fire( - watermark, window, self._sub_context(context, ix)) - - def on_fire(self, watermark, window, context): - ix = context.get_state(self.INDEX_TAG) - if ix < len(self.triggers): - if self.triggers[ix].on_fire( - watermark, window, self._sub_context(context, ix)): - ix += 1 - context.add_state(self.INDEX_TAG, ix) - return ix == len(self.triggers) - - def reset(self, window, context): - context.clear_state(self.INDEX_TAG) - for ix, trigger in enumerate(self.triggers): - trigger.reset(window, self._sub_context(context, ix)) - - @staticmethod - def _sub_context(context, index): - return NestedContext(context, '%d/' % index) - - -class OrFinally(AfterFirst): - - def __init__(self, body_trigger, exit_trigger): - super(OrFinally, self).__init__(body_trigger, exit_trigger) - - -class TriggerContext(object): - - def __init__(self, outer, window): - self._outer = outer - self._window = window - - def set_timer(self, name, time_domain, timestamp): - self._outer.set_timer(self._window, name, time_domain, timestamp) - - def clear_timer(self, name, time_domain): - self._outer.clear_timer(self._window, name, time_domain) - - def add_state(self, tag, value): - self._outer.add_state(self._window, tag, value) - - def get_state(self, tag): - return self._outer.get_state(self._window, tag) - - def clear_state(self, tag): - return self._outer.clear_state(self._window, tag) - - -class NestedContext(object): - """Namespaced context useful for defining composite triggers.""" - - def __init__(self, outer, prefix): - self._outer = outer - self._prefix = prefix - - def set_timer(self, name, time_domain, timestamp): - self._outer.set_timer(self._prefix + name, time_domain, timestamp) - - def clear_timer(self, name, time_domain): - self._outer.clear_timer(self._prefix + name, time_domain) - - def add_state(self, tag, value): - self._outer.add_state(tag.with_prefix(self._prefix), value) - - def get_state(self, tag): - return self._outer.get_state(tag.with_prefix(self._prefix)) - - def clear_state(self, tag): - self._outer.clear_state(tag.with_prefix(self._prefix)) - - -# pylint: disable=unused-argument -class SimpleState(object): - """Basic state storage interface used for triggering. - - Only timers must hold the watermark (by their timestamp). - """ - - __metaclass__ = ABCMeta - - @abstractmethod - def set_timer(self, window, name, time_domain, timestamp): - pass - - @abstractmethod - def get_window(self, window_id): - pass - - @abstractmethod - def clear_timer(self, window, name, time_domain): - pass - - @abstractmethod - def add_state(self, window, tag, value): - pass - - @abstractmethod - def get_state(self, window, tag): - pass - - @abstractmethod - def clear_state(self, window, tag): - pass - - def at(self, window): - return TriggerContext(self, window) - - -class UnmergedState(SimpleState): - """State suitable for use in TriggerDriver. - - This class must be implemented by each backend. - """ - - @abstractmethod - def set_global_state(self, tag, value): - pass - - @abstractmethod - def get_global_state(self, tag, default=None): - pass -# pylint: enable=unused-argument - - -class MergeableStateAdapter(SimpleState): - """Wraps an UnmergedState, tracking merged windows.""" - # TODO(robertwb): A similar indirection could be used for sliding windows - # or other window_fns when a single element typically belongs to many windows. - - WINDOW_IDS = ValueStateTag('window_ids') - - def __init__(self, raw_state): - self.raw_state = raw_state - self.window_ids = self.raw_state.get_global_state(self.WINDOW_IDS, {}) - self.counter = None - - def set_timer(self, window, name, time_domain, timestamp): - self.raw_state.set_timer(self._get_id(window), name, time_domain, timestamp) - - def clear_timer(self, window, name, time_domain): - for window_id in self._get_ids(window): - self.raw_state.clear_timer(window_id, name, time_domain) - - def add_state(self, window, tag, value): - if isinstance(tag, ValueStateTag): - raise ValueError( - 'Merging requested for non-mergeable state tag: %r.' % tag) - self.raw_state.add_state(self._get_id(window), tag, value) - - def get_state(self, window, tag): - values = [self.raw_state.get_state(window_id, tag) - for window_id in self._get_ids(window)] - if isinstance(tag, ValueStateTag): - raise ValueError( - 'Merging requested for non-mergeable state tag: %r.' % tag) - elif isinstance(tag, CombiningValueStateTag): - # TODO(robertwb): Strip combine_fn.extract_output from raw_state tag. - if not values: - accumulator = tag.combine_fn.create_accumulator() - elif len(values) == 1: - accumulator = values[0] - else: - accumulator = tag.combine_fn.merge_accumulators(values) - # TODO(robertwb): Store the merged value in the first tag. - return tag.combine_fn.extract_output(accumulator) - elif isinstance(tag, ListStateTag): - return [v for vs in values for v in vs] - elif isinstance(tag, WatermarkHoldStateTag): - return tag.output_time_fn_impl.combine_all(values) - else: - raise ValueError('Invalid tag.', tag) - - def clear_state(self, window, tag): - for window_id in self._get_ids(window): - self.raw_state.clear_state(window_id, tag) - if tag is None: - del self.window_ids[window] - self._persist_window_ids() - - def merge(self, to_be_merged, merge_result): - for window in to_be_merged: - if window != merge_result: - if window in self.window_ids: - if merge_result in self.window_ids: - merge_window_ids = self.window_ids[merge_result] - else: - merge_window_ids = self.window_ids[merge_result] = [] - merge_window_ids.extend(self.window_ids.pop(window)) - self._persist_window_ids() - - def known_windows(self): - return self.window_ids.keys() - - def get_window(self, window_id): - for window, ids in self.window_ids.items(): - if window_id in ids: - return window - raise ValueError('No window for %s' % window_id) - - def _get_id(self, window): - if window in self.window_ids: - return self.window_ids[window][0] - else: - window_id = self._get_next_counter() - self.window_ids[window] = [window_id] - self._persist_window_ids() - return window_id - - def _get_ids(self, window): - return self.window_ids.get(window, []) - - def _get_next_counter(self): - if not self.window_ids: - self.counter = 0 - elif self.counter is None: - self.counter = max(k for ids in self.window_ids.values() for k in ids) - self.counter += 1 - return self.counter - - def _persist_window_ids(self): - self.raw_state.set_global_state(self.WINDOW_IDS, self.window_ids) - - def __repr__(self): - return '\n\t'.join([repr(self.window_ids)] + - repr(self.raw_state).split('\n')) - - -def create_trigger_driver(windowing, is_batch=False, phased_combine_fn=None): - """Create the TriggerDriver for the given windowing and options.""" - - # TODO(robertwb): We can do more if we know elements are in timestamp - # sorted order. - if windowing.is_default() and is_batch: - driver = DefaultGlobalBatchTriggerDriver() - else: - driver = GeneralTriggerDriver(windowing) - - if phased_combine_fn: - # TODO(ccy): Refactor GeneralTriggerDriver to combine values eagerly using - # the known phased_combine_fn here. - driver = CombiningTriggerDriver(phased_combine_fn, driver) - return driver - - -class TriggerDriver(object): - """Breaks a series of bundle and timer firings into window (pane)s.""" - - __metaclass__ = ABCMeta - - @abstractmethod - def process_elements(self, state, windowed_values, output_watermark): - pass - - @abstractmethod - def process_timer(self, window_id, name, time_domain, timestamp, state): - pass - - -class DefaultGlobalBatchTriggerDriver(TriggerDriver): - """Breaks a bundles into window (pane)s according to the default triggering. - """ - GLOBAL_WINDOW_TUPLE = (GlobalWindow(),) - - def __init__(self): - pass - - def process_elements(self, state, windowed_values, unused_output_watermark): - if isinstance(windowed_values, list): - unwindowed = [wv.value for wv in windowed_values] - else: - class UnwindowedValues(observable.ObservableMixin): - def __iter__(self): - for wv in windowed_values: - unwindowed_value = wv.value - self.notify_observers(unwindowed_value) - yield unwindowed_value - def __repr__(self): - return '' % windowed_values - unwindowed = UnwindowedValues() - yield WindowedValue(unwindowed, MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE) - - def process_timer(self, window_id, name, time_domain, timestamp, state): - raise TypeError('Triggers never set or called for batch default windowing.') - - -class CombiningTriggerDriver(TriggerDriver): - """Uses a phased_combine_fn to process output of wrapped TriggerDriver.""" - - def __init__(self, phased_combine_fn, underlying): - self.phased_combine_fn = phased_combine_fn - self.underlying = underlying - - def process_elements(self, state, windowed_values, output_watermark): - uncombined = self.underlying.process_elements(state, windowed_values, - output_watermark) - for output in uncombined: - yield output.with_value(self.phased_combine_fn.apply(output.value)) - - def process_timer(self, window_id, name, time_domain, timestamp, state): - uncombined = self.underlying.process_timer(window_id, name, time_domain, - timestamp, state) - for output in uncombined: - yield output.with_value(self.phased_combine_fn.apply(output.value)) - - -class GeneralTriggerDriver(TriggerDriver): - """Breaks a series of bundle and timer firings into window (pane)s. - - Suitable for all variants of Windowing. - """ - ELEMENTS = ListStateTag('elements') - TOMBSTONE = CombiningValueStateTag('tombstone', combiners.CountCombineFn()) - - def __init__(self, windowing): - self.window_fn = windowing.windowfn - self.output_time_fn_impl = OutputTimeFn.get_impl(windowing.output_time_fn, - self.window_fn) - # pylint: disable=invalid-name - self.WATERMARK_HOLD = WatermarkHoldStateTag('watermark', - self.output_time_fn_impl) - # pylint: enable=invalid-name - self.trigger_fn = windowing.triggerfn - self.accumulation_mode = windowing.accumulation_mode - self.is_merging = True - - def process_elements(self, state, windowed_values, output_watermark): - if self.is_merging: - state = MergeableStateAdapter(state) - - windows_to_elements = collections.defaultdict(list) - for wv in windowed_values: - for window in wv.windows: - windows_to_elements[window].append((wv.value, wv.timestamp)) - - # First handle merging. - if self.is_merging: - old_windows = set(state.known_windows()) - all_windows = old_windows.union(windows_to_elements.keys()) - - if all_windows != old_windows: - merged_away = {} - - class TriggerMergeContext(WindowFn.MergeContext): - - def merge(_, to_be_merged, merge_result): - for window in to_be_merged: - if window != merge_result: - merged_away[window] = merge_result - state.merge(to_be_merged, merge_result) - self.trigger_fn.on_merge( - to_be_merged, merge_result, state.at(merge_result)) - - self.window_fn.merge(TriggerMergeContext(all_windows)) - - merged_windows_to_elements = collections.defaultdict(list) - for window, values in windows_to_elements.items(): - while window in merged_away: - window = merged_away[window] - merged_windows_to_elements[window].extend(values) - windows_to_elements = merged_windows_to_elements - - for window in merged_away: - state.clear_state(window, self.WATERMARK_HOLD) - - # Next handle element adding. - for window, elements in windows_to_elements.items(): - if state.get_state(window, self.TOMBSTONE): - continue - # Add watermark hold. - # TODO(ccy): Add late data and garbage-collection hold support. - output_time = self.output_time_fn_impl.merge( - window, - (element_output_time for element_output_time in - (self.output_time_fn_impl.assign_output_time(window, timestamp) - for unused_value, timestamp in elements) - if element_output_time >= output_watermark)) - if output_time is not None: - state.add_state(window, self.WATERMARK_HOLD, output_time) - - context = state.at(window) - for value, unused_timestamp in elements: - state.add_state(window, self.ELEMENTS, value) - self.trigger_fn.on_element(value, window, context) - - # Maybe fire this window. - watermark = MIN_TIMESTAMP - if self.trigger_fn.should_fire(watermark, window, context): - finished = self.trigger_fn.on_fire(watermark, window, context) - yield self._output(window, finished, state) - - def process_timer(self, window_id, unused_name, time_domain, timestamp, - state): - if self.is_merging: - state = MergeableStateAdapter(state) - window = state.get_window(window_id) - if state.get_state(window, self.TOMBSTONE): - return - if time_domain == TimeDomain.WATERMARK: - if not self.is_merging or window in state.known_windows(): - context = state.at(window) - if self.trigger_fn.should_fire(timestamp, window, context): - finished = self.trigger_fn.on_fire(timestamp, window, context) - yield self._output(window, finished, state) - else: - raise Exception('Unexpected time domain: %s' % time_domain) - - def _output(self, window, finished, state): - """Output window and clean up if appropriate.""" - - values = state.get_state(window, self.ELEMENTS) - if finished: - # TODO(robertwb): allowed lateness - state.clear_state(window, self.ELEMENTS) - state.add_state(window, self.TOMBSTONE, 1) - elif self.accumulation_mode == AccumulationMode.DISCARDING: - state.clear_state(window, self.ELEMENTS) - - timestamp = state.get_state(window, self.WATERMARK_HOLD) - if timestamp is None: - # If no watermark hold was set, output at end of window. - timestamp = window.end - else: - state.clear_state(window, self.WATERMARK_HOLD) - - return WindowedValue(values, timestamp, (window,)) - - -class InMemoryUnmergedState(UnmergedState): - """In-memory implementation of UnmergedState. - - Used for batch and testing. - """ - def __init__(self, defensive_copy=True): - # TODO(robertwb): Skip defensive_copy in production if it's too expensive. - self.timers = collections.defaultdict(dict) - self.state = collections.defaultdict(lambda: collections.defaultdict(list)) - self.global_state = {} - self.defensive_copy = defensive_copy - - def set_global_state(self, tag, value): - assert isinstance(tag, ValueStateTag) - if self.defensive_copy: - value = copy.deepcopy(value) - self.global_state[tag.tag] = value - - def get_global_state(self, tag, default=None): - return self.global_state.get(tag.tag, default) - - def set_timer(self, window, name, time_domain, timestamp): - self.timers[window][(name, time_domain)] = timestamp - - def clear_timer(self, window, name, time_domain): - self.timers[window].pop((name, time_domain), None) - - def get_window(self, window_id): - return window_id - - def add_state(self, window, tag, value): - if self.defensive_copy: - value = copy.deepcopy(value) - if isinstance(tag, ValueStateTag): - self.state[window][tag.tag] = value - elif isinstance(tag, CombiningValueStateTag): - self.state[window][tag.tag].append(value) - elif isinstance(tag, ListStateTag): - self.state[window][tag.tag].append(value) - elif isinstance(tag, WatermarkHoldStateTag): - self.state[window][tag.tag].append(value) - else: - raise ValueError('Invalid tag.', tag) - - def get_state(self, window, tag): - values = self.state[window][tag.tag] - if isinstance(tag, ValueStateTag): - return values - elif isinstance(tag, CombiningValueStateTag): - return tag.combine_fn.apply(values) - elif isinstance(tag, ListStateTag): - return values - elif isinstance(tag, WatermarkHoldStateTag): - return tag.output_time_fn_impl.combine_all(values) - else: - raise ValueError('Invalid tag.', tag) - - def clear_state(self, window, tag): - self.state[window].pop(tag.tag, None) - if not self.state[window]: - self.state.pop(window, None) - - def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): - expired = [] - for window, timers in list(self.timers.items()): - for (name, time_domain), timestamp in list(timers.items()): - if timestamp <= watermark: - expired.append((window, (name, time_domain, timestamp))) - del timers[(name, time_domain)] - if not timers: - del self.timers[window] - return expired - - def __repr__(self): - state_str = '\n'.join('%s: %s' % (key, dict(state)) - for key, state in self.state.items()) - return 'timers: %s\nstate: %s' % (dict(self.timers), state_str) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/trigger_test.py b/sdks/python/google/cloud/dataflow/transforms/trigger_test.py deleted file mode 100644 index 9aca3bb..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/trigger_test.py +++ /dev/null @@ -1,566 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for the triggering classes.""" - -import collections -import os.path -import unittest - -import yaml - -import google.cloud.dataflow as df -from google.cloud.dataflow.pipeline import Pipeline -from google.cloud.dataflow.transforms.core import Windowing -from google.cloud.dataflow.transforms.trigger import AccumulationMode -from google.cloud.dataflow.transforms.trigger import AfterAll -from google.cloud.dataflow.transforms.trigger import AfterCount -from google.cloud.dataflow.transforms.trigger import AfterEach -from google.cloud.dataflow.transforms.trigger import AfterFirst -from google.cloud.dataflow.transforms.trigger import AfterWatermark -from google.cloud.dataflow.transforms.trigger import DefaultTrigger -from google.cloud.dataflow.transforms.trigger import GeneralTriggerDriver -from google.cloud.dataflow.transforms.trigger import InMemoryUnmergedState -from google.cloud.dataflow.transforms.trigger import Repeatedly -from google.cloud.dataflow.transforms.util import assert_that, equal_to -from google.cloud.dataflow.transforms.window import FixedWindows -from google.cloud.dataflow.transforms.window import IntervalWindow -from google.cloud.dataflow.transforms.window import MIN_TIMESTAMP -from google.cloud.dataflow.transforms.window import OutputTimeFn -from google.cloud.dataflow.transforms.window import Sessions -from google.cloud.dataflow.transforms.window import TimestampedValue -from google.cloud.dataflow.transforms.window import WindowedValue -from google.cloud.dataflow.transforms.window import WindowFn - - -class CustomTimestampingFixedWindowsWindowFn(FixedWindows): - """WindowFn for testing custom timestamping.""" - - def get_transformed_output_time(self, unused_window, input_timestamp): - return input_timestamp + 100 - - -class TriggerTest(unittest.TestCase): - - def run_trigger_simple(self, window_fn, trigger_fn, accumulation_mode, - timestamped_data, expected_panes, *groupings, - **kwargs): - late_data = kwargs.pop('late_data', []) - assert not kwargs - def bundle_data(data, size): - bundle = [] - for timestamp, elem in data: - windows = window_fn.assign(WindowFn.AssignContext(timestamp, elem)) - bundle.append(WindowedValue(elem, timestamp, windows)) - if len(bundle) == size: - yield bundle - bundle = [] - if bundle: - yield bundle - - if not groupings: - groupings = [1] - for group_by in groupings: - bundles = [] - bundle = [] - for timestamp, elem in timestamped_data: - windows = window_fn.assign(WindowFn.AssignContext(timestamp, elem)) - bundle.append(WindowedValue(elem, timestamp, windows)) - if len(bundle) == group_by: - bundles.append(bundle) - bundle = [] - bundles.append(bundle) - self.run_trigger(window_fn, trigger_fn, accumulation_mode, - bundle_data(timestamped_data, group_by), - bundle_data(late_data, group_by), - expected_panes) - - def run_trigger(self, window_fn, trigger_fn, accumulation_mode, - bundles, late_bundles, - expected_panes): - actual_panes = collections.defaultdict(list) - driver = GeneralTriggerDriver( - Windowing(window_fn, trigger_fn, accumulation_mode)) - state = InMemoryUnmergedState() - - for bundle in bundles: - for wvalue in driver.process_elements(state, bundle, MIN_TIMESTAMP): - window, = wvalue.windows - actual_panes[window].append(set(wvalue.value)) - - while state.timers: - for timer_window, (name, time_domain, timestamp) in ( - state.get_and_clear_timers()): - for wvalue in driver.process_timer( - timer_window, name, time_domain, timestamp, state): - window, = wvalue.windows - actual_panes[window].append(set(wvalue.value)) - - for bundle in late_bundles: - for wvalue in driver.process_elements(state, bundle, MIN_TIMESTAMP): - window, = wvalue.windows - actual_panes[window].append(set(wvalue.value)) - - while state.timers: - for timer_window, (name, time_domain, timestamp) in ( - state.get_and_clear_timers()): - for wvalue in driver.process_timer( - timer_window, name, time_domain, timestamp, state): - window, = wvalue.windows - actual_panes[window].append(set(wvalue.value)) - - self.assertEqual(expected_panes, actual_panes) - - def test_fixed_watermark(self): - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterWatermark(), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (13, 'c')], - {IntervalWindow(0, 10): [set('ab')], - IntervalWindow(10, 20): [set('c')]}, - 1, - 2, - 3) - - def test_fixed_watermark_with_early(self): - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterWatermark(early=AfterCount(2)), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c')], - {IntervalWindow(0, 10): [set('ab'), set('abc')]}, - 2) - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterWatermark(early=AfterCount(2)), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c')], - {IntervalWindow(0, 10): [set('abc'), set('abc')]}, - 3) - - def test_fixed_watermark_with_early_late(self): - self.run_trigger_simple( - FixedWindows(100), # pyformat break - AfterWatermark(early=AfterCount(3), - late=AfterCount(2)), - AccumulationMode.DISCARDING, - zip(range(9), 'abcdefghi'), - {IntervalWindow(0, 100): [ - set('abcd'), set('efgh'), # early - set('i'), # on time - set('vw'), set('xy') # late - ]}, - 2, - late_data=zip(range(5), 'vwxyz')) - - def test_sessions_watermark_with_early_late(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterWatermark(early=AfterCount(2), - late=AfterCount(1)), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (15, 'b'), (7, 'c'), (30, 'd')], - {IntervalWindow(1, 25): [ - set('abc'), # early - set('abc'), # on time - set('abcxy') # late - ], - IntervalWindow(30, 40): [ - set('d'), # on time - ], - IntervalWindow(1, 40): [ - set('abcdxyz') # late - ], - }, - 2, - late_data=[(1, 'x'), (2, 'y'), (21, 'z')]) - - def test_fixed_after_count(self): - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterCount(2), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c'), (11, 'z')], - {IntervalWindow(0, 10): [set('ab')]}, - 1, - 2) - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterCount(2), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c'), (11, 'z')], - {IntervalWindow(0, 10): [set('abc')]}, - 3, - 4) - - def test_fixed_after_first(self): - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterFirst(AfterCount(2), AfterWatermark()), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c')], - {IntervalWindow(0, 10): [set('ab')]}, - 1, - 2) - self.run_trigger_simple( - FixedWindows(10), # pyformat break - AfterFirst(AfterCount(5), AfterWatermark()), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c')], - {IntervalWindow(0, 10): [set('abc')]}, - 1, - 2, - late_data=[(1, 'x'), (2, 'y'), (3, 'z')]) - - def test_repeatedly_after_first(self): - self.run_trigger_simple( - FixedWindows(100), # pyformat break - Repeatedly(AfterFirst(AfterCount(3), AfterWatermark())), - AccumulationMode.ACCUMULATING, - zip(range(7), 'abcdefg'), - {IntervalWindow(0, 100): [ - set('abc'), - set('abcdef'), - set('abcdefg'), - set('abcdefgx'), - set('abcdefgxy'), - set('abcdefgxyz')]}, - 1, - late_data=zip(range(3), 'xyz')) - - def test_sessions_after_all(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterAll(AfterCount(2), AfterWatermark()), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c')], - {IntervalWindow(1, 13): [set('abc')]}, - 1, - 2) - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterAll(AfterCount(5), AfterWatermark()), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (3, 'c')], - {IntervalWindow(1, 13): [set('abcxy')]}, - 1, - 2, - late_data=[(1, 'x'), (2, 'y'), (3, 'z')]) - - def test_sessions_default(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - DefaultTrigger(), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b')], - {IntervalWindow(1, 12): [set('ab')]}, - 1, - 2) - - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterWatermark(), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (15, 'c'), (16, 'd'), (30, 'z'), (9, 'e'), - (10, 'f'), (30, 'y')], - {IntervalWindow(1, 26): [set('abcdef')], - IntervalWindow(30, 40): [set('yz')]}, - 1, - 2, - 3, - 4, - 5, - 6) - - def test_sessions_watermark(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterWatermark(), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b')], - {IntervalWindow(1, 12): [set('ab')]}, - 1, - 2) - - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterWatermark(), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (2, 'b'), (15, 'c'), (16, 'd'), (30, 'z'), (9, 'e'), - (10, 'f'), (30, 'y')], - {IntervalWindow(1, 26): [set('abcdef')], - IntervalWindow(30, 40): [set('yz')]}, - 1, - 2, - 3, - 4, - 5, - 6) - - def test_sessions_after_count(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterCount(2), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (15, 'b'), (6, 'c'), (30, 's'), (31, 't'), (50, 'z'), - (50, 'y')], - {IntervalWindow(1, 25): [set('abc')], - IntervalWindow(30, 41): [set('st')], - IntervalWindow(50, 60): [set('yz')]}, - 1, - 2, - 3) - - def test_sessions_repeatedly_after_count(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - Repeatedly(AfterCount(2)), - AccumulationMode.ACCUMULATING, - [(1, 'a'), (15, 'b'), (6, 'c'), (2, 'd'), (7, 'e')], - {IntervalWindow(1, 25): [set('abc'), set('abcde')]}, - 1, - 3) - self.run_trigger_simple( - Sessions(10), # pyformat break - Repeatedly(AfterCount(2)), - AccumulationMode.DISCARDING, - [(1, 'a'), (15, 'b'), (6, 'c'), (2, 'd'), (7, 'e')], - {IntervalWindow(1, 25): [set('abc'), set('de')]}, - 1, - 3) - - def test_sessions_after_each(self): - self.run_trigger_simple( - Sessions(10), # pyformat break - AfterEach(AfterCount(2), AfterCount(3)), - AccumulationMode.ACCUMULATING, - zip(range(10), 'abcdefghij'), - {IntervalWindow(0, 11): [set('ab')], - IntervalWindow(0, 15): [set('abcdef')]}, - 2) - - self.run_trigger_simple( - Sessions(10), # pyformat break - Repeatedly(AfterEach(AfterCount(2), AfterCount(3))), - AccumulationMode.ACCUMULATING, - zip(range(10), 'abcdefghij'), - {IntervalWindow(0, 11): [set('ab')], - IntervalWindow(0, 15): [set('abcdef')], - IntervalWindow(0, 17): [set('abcdefgh')]}, - 2) - - -class TriggerPipelineTest(unittest.TestCase): - - def test_after_count(self): - p = Pipeline('DirectPipelineRunner') - result = (p - | df.Create([1, 2, 3, 4, 5, 10, 11]) - | df.FlatMap(lambda t: [('A', t), ('B', t + 5)]) - | df.Map(lambda (k, t): TimestampedValue((k, t), t)) - | df.WindowInto(FixedWindows(10), trigger=AfterCount(3), - accumulation_mode=AccumulationMode.DISCARDING) - | df.GroupByKey() - | df.Map(lambda (k, v): ('%s-%s' % (k, len(v)), set(v)))) - assert_that(result, equal_to( - { - 'A-5': {1, 2, 3, 4, 5}, - # A-10, A-11 never emitted due to AfterCount(3) never firing. - 'B-4': {6, 7, 8, 9}, - 'B-3': {10, 15, 16}, - }.iteritems())) - - -class TranscriptTest(unittest.TestCase): - - # We must prepend an underscore to this name so that the open-source unittest - # runner does not execute this method directly as a test. - @classmethod - def _create_test(cls, spec): - counter = 0 - name = spec.get('name', 'unnamed') - unique_name = 'test_' + name - while hasattr(cls, unique_name): - counter += 1 - unique_name = 'test_%s_%d' % (name, counter) - setattr(cls, unique_name, lambda self: self._run_log_test(spec)) - - # We must prepend an underscore to this name so that the open-source unittest - # runner does not execute this method directly as a test. - @classmethod - def _create_tests(cls, transcript_filename): - for spec in yaml.load_all(open(transcript_filename)): - cls._create_test(spec) - - def _run_log_test(self, spec): - if 'error' in spec: - self.assertRaisesRegexp( - AssertionError, spec['error'], self._run_log, spec) - else: - self._run_log(spec) - - def _run_log(self, spec): - - def parse_int_list(s): - """Parses strings like '[1, 2, 3]'.""" - s = s.strip() - assert s[0] == '[' and s[-1] == ']', s - if not s[1:-1].strip(): - return [] - else: - return [int(x) for x in s[1:-1].split(',')] - - def split_args(s): - """Splits 'a, b, [c, d]' into ['a', 'b', '[c, d]'].""" - args = [] - start = 0 - depth = 0 - for ix in xrange(len(s)): - c = s[ix] - if c in '({[': - depth += 1 - elif c in ')}]': - depth -= 1 - elif c == ',' and depth == 0: - args.append(s[start:ix].strip()) - start = ix + 1 - assert depth == 0, s - args.append(s[start:].strip()) - return args - - def parse(s, names): - """Parse (recursive) 'Foo(arg, kw=arg)' for Foo in the names dict.""" - s = s.strip() - if s in names: - return names[s] - elif s[0] == '[': - return parse_int_list(s) - elif '(' in s: - assert s[-1] == ')', s - callee = parse(s[:s.index('(')], names) - posargs = [] - kwargs = {} - for arg in split_args(s[s.index('(') + 1:-1]): - if '=' in arg: - kw, value = arg.split('=', 1) - kwargs[kw] = parse(value, names) - else: - posargs.append(parse(arg, names)) - return callee(*posargs, **kwargs) - else: - try: - return int(s) - except ValueError: - raise ValueError('Unknown function: %s' % s) - - def parse_fn(s, names): - """Like parse(), but implicitly calls no-arg constructors.""" - fn = parse(s, names) - if isinstance(fn, type): - return fn() - else: - return fn - - # pylint: disable=g-import-not-at-top - from google.cloud.dataflow.transforms import window as window_module - from google.cloud.dataflow.transforms import trigger as trigger_module - # pylint: enable=g-import-not-at-top - window_fn_names = dict(window_module.__dict__) - window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn': - CustomTimestampingFixedWindowsWindowFn}) - trigger_names = {'Default': DefaultTrigger} - trigger_names.update(trigger_module.__dict__) - - window_fn = parse_fn(spec.get('window_fn', 'GlobalWindows'), - window_fn_names) - trigger_fn = parse_fn(spec.get('trigger_fn', 'Default'), trigger_names) - accumulation_mode = getattr( - AccumulationMode, spec.get('accumulation_mode', 'ACCUMULATING').upper()) - output_time_fn = getattr( - OutputTimeFn, spec.get('output_time_fn', 'OUTPUT_AT_EOW').upper()) - allowed_lateness = float(spec.get('allowed_lateness', '-inf')) - - driver = GeneralTriggerDriver( - Windowing(window_fn, trigger_fn, accumulation_mode, output_time_fn)) - state = InMemoryUnmergedState() - output = [] - watermark = MIN_TIMESTAMP - - def fire_timers(): - to_fire = state.get_and_clear_timers(watermark) - while to_fire: - for timer_window, (name, time_domain, t_timestamp) in to_fire: - for wvalue in driver.process_timer( - timer_window, name, time_domain, t_timestamp, state): - window, = wvalue.windows - output.append({'window': [window.start, window.end - 1], - 'values': sorted(wvalue.value), - 'timestamp': wvalue.timestamp}) - to_fire = state.get_and_clear_timers(watermark) - - for line in spec['transcript']: - - action, params = line.items()[0] - - if action != 'expect': - # Fail if we have output that was not expected in the transcript. - self.assertEquals( - [], output, msg='Unexpected output: %s before %s' % (output, line)) - - if action == 'input': - bundle = [ - WindowedValue(t, t, window_fn.assign(WindowFn.AssignContext(t, t))) - for t in params] - output = [{'window': [wvalue.windows[0].start, - wvalue.windows[0].end - 1], - 'values': sorted(wvalue.value), - 'timestamp': wvalue.timestamp} - for wvalue - in driver.process_elements(state, bundle, watermark)] - fire_timers() - - elif action == 'watermark': - watermark = params - fire_timers() - - elif action == 'expect': - for expected_output in params: - for candidate in output: - if all(candidate[k] == expected_output[k] - for k in candidate if k in expected_output): - output.remove(candidate) - break - else: - self.fail('Unmatched output %s in %s' % (expected_output, output)) - - elif action == 'state': - # TODO(robertwb): Implement once we support allowed lateness. - pass - - else: - self.fail('Unknown action: ' + action) - - # Fail if we have output that was not expected in the transcript. - self.assertEquals([], output, msg='Unexpected output: %s' % output) - - -TRANSCRIPT_TEST_FILE = os.path.join(os.path.dirname(__file__), - 'trigger_transcripts.yaml') -if os.path.exists(TRANSCRIPT_TEST_FILE): - TranscriptTest._create_tests(TRANSCRIPT_TEST_FILE) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml b/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml deleted file mode 100644 index 91d88bf..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml +++ /dev/null @@ -1,207 +0,0 @@ -name: fixed_default -window_fn: FixedWindows(10) # Python names/syntax, unless otherwise noted. -trigger_fn: Default # Same. Empty () may be omitted. -transcript: # Ordered list of events. - - input: [1, 2, 3, 10, 11] # The elements are the timestamps. - - watermark: 25 - - expect: # Every expected output from the last action. - - {window: [0, 9], values: [1, 2, 3]} - - {window: [10, 19], values: [10, 11]} # Partial match on attributes OK. - ---- -name: fixed_default_late_data -window_fn: FixedWindows(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EOW -transcript: - - input: [1, 2, 3, 10, 11, 25] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2, 3], timestamp: 10, final: false} - - {window: [10, 19], values: [10, 11], timestamp: 20} - - {window: [20, 29], values: [25], timestamp: 30, late: false} - - input: [7] - - expect: - - {window: [0, 9], values: [1, 2, 3, 7], timestamp: 10, late: true} - ---- -name: output_time_fn_earliest -window_fn: FixedWindows(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST -transcript: - - input: [1, 2, 3, 10, 11, 25] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2, 3], timestamp: 1, final: false} - - {window: [10, 19], values: [10, 11], timestamp: 10} - - {window: [20, 29], values: [25], timestamp: 25, late: false} - ---- -name: output_time_fn_latest -window_fn: FixedWindows(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_LATEST -transcript: - - input: [1, 2, 3, 10, 11, 25] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2, 3], timestamp: 3, final: false} - - {window: [10, 19], values: [10, 11], timestamp: 11} - - {window: [20, 29], values: [25], timestamp: 25, late: false} - ---- -# Test that custom timestamping is not invoked. -name: output_time_fn_custom_timestamping_eow -window_fn: CustomTimestampingFixedWindowsWindowFn(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EOW -transcript: - - input: [1, 2, 3, 10, 11, 25] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2, 3], timestamp: 10, final: false} - - {window: [10, 19], values: [10, 11], timestamp: 20} - - {window: [20, 29], values: [25], timestamp: 30, late: false} - ---- -# Test that custom timestamping is not invoked. -name: output_time_fn_custom_timestamping_earliest -window_fn: CustomTimestampingFixedWindowsWindowFn(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST -transcript: - - input: [1, 2, 3, 10, 11, 25] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2, 3], timestamp: 1, final: false} - - {window: [10, 19], values: [10, 11], timestamp: 10} - - {window: [20, 29], values: [25], timestamp: 25, late: false} - ---- -# Test that custom timestamping is in fact invoked. -name: output_time_fn_custom_timestamping_earliest -window_fn: CustomTimestampingFixedWindowsWindowFn(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST_TRANSFORMED -transcript: - - input: [1, 2, 3, 10, 11, 25] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2, 3], timestamp: 101, final: false} - - {window: [10, 19], values: [10, 11], timestamp: 110} - - {window: [20, 29], values: [25], timestamp: 125, late: false} - ---- -name: early_late_sessions -window_fn: Sessions(10) -trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3)) -output_time_fn: OUTPUT_AT_EOW -transcript: - - input: [1, 2, 3] - - expect: - - {window: [1, 12], values: [1, 2, 3], timestamp: 13, early: true} - - input: [4] # no output - - input: [5] - - expect: - - {window: [1, 14], values: [1, 2, 3, 4, 5], timestamp: 14, early: true} - - input: [6] - - watermark: 100 - - expect: - - {window: [1, 15], values:[1, 2, 3, 4, 5, 6], timestamp: 16, - final: true} - - input: [1] - - input: [3, 4] - - expect: - - {window: [1, 15], values: [1, 1, 2, 3, 3, 4, 4, 5, 6], timestamp: 16} - ---- -name: garbage_collection -window_fn: FixedWindows(10) -trigger_fn: AfterCount(2) -output_time_fn: OUTPUT_AT_EOW -allowed_lateness: 10 -accumulation_mode: discarding -transcript: - - input: [1, 2, 3, 10, 11, 25] - - expect: - - {window: [0, 9], timestamp: 10} - - {window: [10, 19], timestamp: 20} - - state: - present: [[20, 29]] - absent: [[0, 9]] - tombstone: [[10, 19]] - ---- -name: known_late_data_watermark -window_fn: FixedWindows(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST -transcript: - - watermark: 5 - - input: [2, 3, 7, 8] - - watermark: 11 - - expect: - - {window: [0, 9], values: [2, 3, 7, 8], timestamp: 7} - ---- -name: known_late_data_no_watermark_hold_possible -window_fn: FixedWindows(10) -trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST -transcript: - - watermark: 8 - - input: [2, 3, 7] - - watermark: 11 - - expect: - - {window: [0, 9], values: [2, 3, 7], timestamp: 10} - -# These next examples test that bad/incomplete transcripts are rejected. ---- -name: bad_output -error: Unmatched output -windowfn: FixedWindows(10) -transcript: - - input: [1, 2, 3] - - expect: - - {window: [0, 9], values: [1, 2, 3]} # bad - - watermark: 100 - ---- -name: bad_expected_values -error: Unmatched output -window_fn: FixedWindows(10) -transcript: - - input: [1, 2, 3] - - watermark: 100 - - expect: - - {window: [0, 9], values: [1, 2]} # bad values - ---- -name: bad_expected_window -error: Unmatched output -window_fn: FixedWindows(10) -transcript: - - input: [1, 2, 3] - - watermark: 100 - - expect: - - {window: [0, 19], values: [1, 2, 3]} # bad window - ---- -name: missing_output -error: Unexpected output -window_fn: FixedWindows(10) -transcript: - - input: [1, 2, 3] - - watermark: 100 - # missing output - - watermark: 200 - ---- -name: missing_output_at_end -error: Unexpected output -window_fn: FixedWindows(10) -transcript: - - input: [1, 2, 3] - - watermark: 100 - # missing output http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/google/cloud/dataflow/transforms/util.py b/sdks/python/google/cloud/dataflow/transforms/util.py deleted file mode 100644 index 2c41dc3..0000000 --- a/sdks/python/google/cloud/dataflow/transforms/util.py +++ /dev/null @@ -1,227 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Simple utility PTransforms. -""" - -from __future__ import absolute_import - -import collections -import operator - -from google.cloud.dataflow.pvalue import AsIter as AllOf -from google.cloud.dataflow.transforms.core import CombinePerKey, Create, Flatten, GroupByKey, Map -from google.cloud.dataflow.transforms.ptransform import PTransform -from google.cloud.dataflow.transforms.ptransform import ptransform_fn - - -__all__ = [ - 'CoGroupByKey', - 'Keys', - 'KvSwap', - 'RemoveDuplicates', - 'Values', - 'assert_that', - 'equal_to', - 'is_empty', - ] - - -class CoGroupByKey(PTransform): - """Groups results across several PCollections by key. - - Given an input dict mapping serializable keys (called "tags") to 0 or more - PCollections of (key, value) tuples, e.g.:: - - {'pc1': pcoll1, 'pc2': pcoll2, 33333: pcoll3} - - creates a single output PCollection of (key, value) tuples whose keys are the - unique input keys from all inputs, and whose values are dicts mapping each - tag to an iterable of whatever values were under the key in the corresponding - PCollection:: - - ('some key', {'pc1': ['value 1 under "some key" in pcoll1', - 'value 2 under "some key" in pcoll1'], - 'pc2': [], - 33333: ['only value under "some key" in pcoll3']}) - - Note that pcoll2 had no values associated with "some key". - - CoGroupByKey also works for tuples, lists, or other flat iterables of - PCollections, in which case the values of the resulting PCollections - will be tuples whose nth value is the list of values from the nth - PCollection---conceptually, the "tags" are the indices into the input. - Thus, for this input:: - - (pcoll1, pcoll2, pcoll3) - - the output PCollection's value for "some key" is:: - - ('some key', (['value 1 under "some key" in pcoll1', - 'value 2 under "some key" in pcoll1'], - [], - ['only value under "some key" in pcoll3'])) - - Args: - label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. - **kwargs: Accepts a single named argument "pipeline", which specifies the - pipeline that "owns" this PTransform. Ordinarily CoGroupByKey can obtain - this information from one of the input PCollections, but if there are none - (or if there's a chance there may be none), this argument is the only way - to provide pipeline information, and should be considered mandatory. - """ - - def __init__(self, label=None, **kwargs): - super(CoGroupByKey, self).__init__(label) - self.pipeline = kwargs.pop('pipeline', None) - if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) - - def _extract_input_pvalues(self, pvalueish): - try: - # If this works, it's a dict. - return pvalueish, tuple(pvalueish.viewvalues()) - except AttributeError: - pcolls = tuple(pvalueish) - return pcolls, pcolls - - def apply(self, pcolls): - """Performs CoGroupByKey on argument pcolls; see class docstring.""" - # For associating values in K-V pairs with the PCollections they came from. - def _pair_tag_with_value((key, value), tag): - return (key, (tag, value)) - - # Creates the key, value pairs for the output PCollection. Values are either - # lists or dicts (per the class docstring), initialized by the result of - # result_ctor(result_ctor_arg). - def _merge_tagged_vals_under_key((key, grouped), result_ctor, - result_ctor_arg): - result_value = result_ctor(result_ctor_arg) - for tag, value in grouped: - result_value[tag].append(value) - return (key, result_value) - - try: - # If pcolls is a dict, we turn it into (tag, pcoll) pairs for use in the - # general-purpose code below. The result value constructor creates dicts - # whose keys are the tags. - result_ctor_arg = pcolls.keys() - result_ctor = lambda tags: dict((tag, []) for tag in tags) - pcolls = pcolls.items() - except AttributeError: - # Otherwise, pcolls is a list/tuple, so we turn it into (index, pcoll) - # pairs. The result value constructor makes tuples with len(pcolls) slots. - pcolls = list(enumerate(pcolls)) - result_ctor_arg = len(pcolls) - result_ctor = lambda size: tuple([] for _ in xrange(size)) - - # Check input PCollections for PCollection-ness, and that they all belong - # to the same pipeline. - for _, pcoll in pcolls: - self._check_pcollection(pcoll) - if self.pipeline: - assert pcoll.pipeline == self.pipeline - - return ([pcoll | Map('pair_with_%s' % tag, _pair_tag_with_value, tag) - for tag, pcoll in pcolls] - | Flatten(pipeline=self.pipeline) - | GroupByKey() - | Map(_merge_tagged_vals_under_key, result_ctor, result_ctor_arg)) - - -def Keys(label='Keys'): # pylint: disable=invalid-name - """Produces a PCollection of first elements of 2-tuples in a PCollection.""" - return Map(label, lambda (k, v): k) - - -def Values(label='Values'): # pylint: disable=invalid-name - """Produces a PCollection of second elements of 2-tuples in a PCollection.""" - return Map(label, lambda (k, v): v) - - -def KvSwap(label='KvSwap'): # pylint: disable=invalid-name - """Produces a PCollection reversing 2-tuples in a PCollection.""" - return Map(label, lambda (k, v): (v, k)) - - -@ptransform_fn -def RemoveDuplicates(label, pcoll): # pylint: disable=invalid-name - """Produces a PCollection containing the unique elements of a PCollection.""" - return (pcoll - | Map('%s:ToPairs' % label, lambda v: (v, None)) - | CombinePerKey('%s:Group' % label, lambda vs: None) - | Keys('%s:RemoveDuplicates' % label)) - - -class DataflowAssertException(Exception): - """Exception raised by matcher classes used by assert_that transform.""" - - pass - - -# Note that equal_to always sorts the expected and actual since what we -# compare are PCollections for which there is no guaranteed order. -# However the sorting does not go beyond top level therefore [1,2] and [2,1] -# are considered equal and [[1,2]] and [[2,1]] are not. -# TODO(silviuc): Add contains_in_any_order-style matchers. -def equal_to(expected): - expected = list(expected) - def _equal(actual): - sorted_expected = sorted(expected) - sorted_actual = sorted(actual) - if sorted_expected != sorted_actual: - raise DataflowAssertException( - 'Failed assert: %r == %r' % (sorted_expected, sorted_actual)) - return _equal - - -def is_empty(): - def _empty(actual): - if actual: - raise DataflowAssertException( - 'Failed assert: [] == %r' % actual) - return _empty - - -def assert_that(actual, matcher, label='assert_that'): - """A PTransform that checks a PCollection has an expected value. - - Note that assert_that should be used only for testing pipelines since the - check relies on materializing the entire PCollection being checked. - - Args: - actual: A PCollection. - matcher: A matcher function taking as argument the actual value of a - materialized PCollection. The matcher validates this actual value against - expectations and raises DataflowAssertException if they are not met. - label: Optional string label. This is needed in case several assert_that - transforms are introduced in the same pipeline. - - Returns: - Ignored. - """ - - def match(_, actual): - matcher(actual) - - class AssertThat(PTransform): - - def apply(self, pipeline): - return pipeline | Create('singleton', [None]) | Map(match, AllOf(actual)) - - def default_label(self): - return label - - actual.pipeline | AssertThat()