beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [05/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder
Date Tue, 14 Jun 2016 23:12:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/timeutil.py b/sdks/python/google/cloud/dataflow/transforms/timeutil.py
deleted file mode 100644
index 7b750f9..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/timeutil.py
+++ /dev/null
@@ -1,310 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Time and timer utilities."""
-
-from __future__ import absolute_import
-
-from abc import ABCMeta
-from abc import abstractmethod
-
-import datetime
-import sys
-
-
-class Timestamp(object):
-  """Represents a Unix second timestamp with microsecond granularity.
-
-  Can be treated in common timestamp arithmetic operations as a numeric type.
-
-  Internally stores a time interval as an int of microseconds. This strategy
-  is necessary since floating point values lose precision when storing values,
-  especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
-  to 0.0999999994448885).
-  """
-
-  def __init__(self, seconds=0, micros=0):
-    self.micros = int(seconds * 1000000) + int(micros)
-
-  @staticmethod
-  def of(seconds):
-    """Return the Timestamp for the given number of seconds.
-
-    If the input is already a Timestamp, the input itself will be returned.
-
-    Args:
-      seconds: Number of seconds as int, float or Timestamp.
-
-    Returns:
-      Corresponding Timestamp object.
-    """
-
-    if isinstance(seconds, Duration):
-      raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
-    if isinstance(seconds, Timestamp):
-      return seconds
-    return Timestamp(seconds)
-
-  def __repr__(self):
-    micros = self.micros
-    sign = ''
-    if micros < 0:
-      sign = '-'
-      micros = -micros
-    int_part = micros / 1000000
-    frac_part = micros % 1000000
-    if frac_part:
-      return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part)
-    else:
-      return 'Timestamp(%s%d)' % (sign, int_part)
-
-  def to_utc_datetime(self):
-    epoch = datetime.datetime.utcfromtimestamp(0)
-    # We can't easily construct a datetime object from microseconds, so we
-    # create one at the epoch and add an appropriate timedelta interval.
-    return epoch + datetime.timedelta(microseconds=self.micros)
-
-  def isoformat(self):
-    # Append 'Z' for UTC timezone.
-    return self.to_utc_datetime().isoformat() + 'Z'
-
-  def __float__(self):
-    # Note that the returned value may have lost precision.
-    return float(self.micros) / 1000000
-
-  def __int__(self):
-    # Note that the returned value may have lost precision.
-    return self.micros / 1000000
-
-  def __cmp__(self, other):
-    # Allow comparisons between Duration and Timestamp values.
-    if not isinstance(other, Duration):
-      other = Timestamp.of(other)
-    return cmp(self.micros, other.micros)
-
-  def __hash__(self):
-    return hash(self.micros)
-
-  def __add__(self, other):
-    other = Duration.of(other)
-    return Timestamp(micros=self.micros + other.micros)
-
-  def __radd__(self, other):
-    return self + other
-
-  def __sub__(self, other):
-    other = Duration.of(other)
-    return Timestamp(micros=self.micros - other.micros)
-
-  def __mod__(self, other):
-    other = Duration.of(other)
-    return Duration(micros=self.micros % other.micros)
-
-
-MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1)
-MAX_TIMESTAMP = Timestamp(micros=sys.maxint)
-
-
-class Duration(object):
-  """Represents a second duration with microsecond granularity.
-
-  Can be treated in common arithmetic operations as a numeric type.
-
-  Internally stores a time interval as an int of microseconds. This strategy
-  is necessary since floating point values lose precision when storing values,
-  especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
-  to 0.0999999994448885).
-  """
-
-  def __init__(self, seconds=0, micros=0):
-    self.micros = int(seconds * 1000000) + int(micros)
-
-  @staticmethod
-  def of(seconds):
-    """Return the Duration for the given number of seconds since Unix epoch.
-
-    If the input is already a Duration, the input itself will be returned.
-
-    Args:
-      seconds: Number of seconds as int, float or Duration.
-
-    Returns:
-      Corresponding Duration object.
-    """
-
-    if isinstance(seconds, Timestamp):
-      raise TypeError('Can\'t interpret %s as Duration.' % seconds)
-    if isinstance(seconds, Duration):
-      return seconds
-    return Duration(seconds)
-
-  def __repr__(self):
-    micros = self.micros
-    sign = ''
-    if micros < 0:
-      sign = '-'
-      micros = -micros
-    int_part = micros / 1000000
-    frac_part = micros % 1000000
-    if frac_part:
-      return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part)
-    else:
-      return 'Duration(%s%d)' % (sign, int_part)
-
-  def __float__(self):
-    # Note that the returned value may have lost precision.
-    return float(self.micros) / 1000000
-
-  def __int__(self):
-    # Note that the returned value may have lost precision.
-    return self.micros / 1000000
-
-  def __cmp__(self, other):
-    # Allow comparisons between Duration and Timestamp values.
-    if not isinstance(other, Timestamp):
-      other = Duration.of(other)
-    return cmp(self.micros, other.micros)
-
-  def __hash__(self):
-    return hash(self.micros)
-
-  def __neg__(self):
-    return Duration(micros=-self.micros)
-
-  def __add__(self, other):
-    if isinstance(other, Timestamp):
-      return other + self
-    other = Duration.of(other)
-    return Duration(micros=self.micros + other.micros)
-
-  def __radd__(self, other):
-    return self + other
-
-  def __sub__(self, other):
-    other = Duration.of(other)
-    return Duration(micros=self.micros - other.micros)
-
-  def __rsub__(self, other):
-    return -(self - other)
-
-  def __mul__(self, other):
-    other = Duration.of(other)
-    return Duration(micros=self.micros * other.micros / 1000000)
-
-  def __rmul__(self, other):
-    return self * other
-
-  def __mod__(self, other):
-    other = Duration.of(other)
-    return Duration(micros=self.micros % other.micros)
-
-
-class TimeDomain(object):
-  """Time domain for streaming timers."""
-
-  WATERMARK = 'WATERMARK'
-  REAL_TIME = 'REAL_TIME'
-  DEPENDENT_REAL_TIME = 'DEPENDENT_REAL_TIME'
-
-  @staticmethod
-  def from_string(domain):
-    if domain in (TimeDomain.WATERMARK,
-                  TimeDomain.REAL_TIME,
-                  TimeDomain.DEPENDENT_REAL_TIME):
-      return domain
-    raise ValueError('Unknown time domain: %s' % domain)
-
-
-class OutputTimeFnImpl(object):
-  """Implementation of OutputTimeFn."""
-
-  __metaclass__ = ABCMeta
-
-  @abstractmethod
-  def assign_output_time(self, window, input_timestamp):
-    pass
-
-  @abstractmethod
-  def combine(self, output_timestamp, other_output_timestamp):
-    pass
-
-  def combine_all(self, merging_timestamps):
-    """Apply combine to list of timestamps."""
-    combined_output_time = None
-    for output_time in merging_timestamps:
-      if combined_output_time is None:
-        combined_output_time = output_time
-      else:
-        combined_output_time = self.combine(
-            combined_output_time, output_time)
-    return combined_output_time
-
-  def merge(self, unused_result_window, merging_timestamps):
-    """Default to returning the result of combine_all."""
-    return self.combine_all(merging_timestamps)
-
-
-class DependsOnlyOnWindow(OutputTimeFnImpl):
-  """OutputTimeFnImpl that only depends on the window."""
-
-  __metaclass__ = ABCMeta
-
-  def combine(self, output_timestamp, other_output_timestamp):
-    return output_timestamp
-
-  def merge(self, result_window, unused_merging_timestamps):
-    # Since we know that the result only depends on the window, we can ignore
-    # the given timestamps.
-    return self.assign_output_time(result_window, None)
-
-
-class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
-  """OutputTimeFnImpl outputting at earliest input timestamp."""
-
-  def assign_output_time(self, window, input_timestamp):
-    return input_timestamp
-
-  def combine(self, output_timestamp, other_output_timestamp):
-    """Default to returning the earlier of two timestamps."""
-    return min(output_timestamp, other_output_timestamp)
-
-
-class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
-  """OutputTimeFnImpl outputting at earliest input timestamp."""
-
-  def __init__(self, window_fn):
-    self.window_fn = window_fn
-
-  def assign_output_time(self, window, input_timestamp):
-    return self.window_fn.get_transformed_output_time(window, input_timestamp)
-
-  def combine(self, output_timestamp, other_output_timestamp):
-    return min(output_timestamp, other_output_timestamp)
-
-
-class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
-  """OutputTimeFnImpl outputting at latest input timestamp."""
-
-  def assign_output_time(self, window, input_timestamp):
-    return input_timestamp
-
-  def combine(self, output_timestamp, other_output_timestamp):
-    return max(output_timestamp, other_output_timestamp)
-
-
-class OutputAtEndOfWindowImpl(DependsOnlyOnWindow):
-  """OutputTimeFnImpl outputting at end of window."""
-
-  def assign_output_time(self, window, unused_input_timestamp):
-    return window.end

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py b/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py
deleted file mode 100644
index 26ff3ae..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/timeutil_test.py
+++ /dev/null
@@ -1,165 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for time utilities."""
-
-from __future__ import absolute_import
-
-import unittest
-
-from google.cloud.dataflow.transforms.timeutil import Duration
-from google.cloud.dataflow.transforms.timeutil import Timestamp
-
-
-class TimestampTest(unittest.TestCase):
-
-  def test_of(self):
-    interval = Timestamp(123)
-    self.assertEqual(id(interval), id(Timestamp.of(interval)))
-    self.assertEqual(interval, Timestamp.of(123.0))
-    with self.assertRaises(TypeError):
-      Timestamp.of(Duration(10))
-
-  def test_precision(self):
-    self.assertEqual(Timestamp(10000000) % 0.1, 0)
-    self.assertEqual(Timestamp(10000000) % 0.05, 0)
-    self.assertEqual(Timestamp(10000000) % 0.000005, 0)
-    self.assertEqual(Timestamp(10000000) % Duration(0.1), 0)
-    self.assertEqual(Timestamp(10000000) % Duration(0.05), 0)
-    self.assertEqual(Timestamp(10000000) % Duration(0.000005), 0)
-
-  def test_utc_timestamp(self):
-    self.assertEqual(Timestamp(10000000).isoformat(),
-                     '1970-04-26T17:46:40Z')
-    self.assertEqual(Timestamp(10000000.000001).isoformat(),
-                     '1970-04-26T17:46:40.000001Z')
-    self.assertEqual(Timestamp(1458343379.123456).isoformat(),
-                     '2016-03-18T23:22:59.123456Z')
-
-  def test_arithmetic(self):
-    # Supported operations.
-    self.assertEqual(Timestamp(123) + 456, 579)
-    self.assertEqual(Timestamp(123) + Duration(456), 579)
-    self.assertEqual(456 + Timestamp(123), 579)
-    self.assertEqual(Duration(456) + Timestamp(123), 579)
-    self.assertEqual(Timestamp(123) - 456, -333)
-    self.assertEqual(Timestamp(123) - Duration(456), -333)
-    self.assertEqual(Timestamp(1230) % 456, 318)
-    self.assertEqual(Timestamp(1230) % Duration(456), 318)
-
-    # Check that direct comparison of Timestamp and Duration is allowed.
-    self.assertTrue(Duration(123) == Timestamp(123))
-    self.assertTrue(Timestamp(123) == Duration(123))
-    self.assertFalse(Duration(123) == Timestamp(1230))
-    self.assertFalse(Timestamp(123) == Duration(1230))
-
-    # Check return types.
-    self.assertEqual((Timestamp(123) + 456).__class__, Timestamp)
-    self.assertEqual((Timestamp(123) + Duration(456)).__class__, Timestamp)
-    self.assertEqual((456 + Timestamp(123)).__class__, Timestamp)
-    self.assertEqual((Duration(456) + Timestamp(123)).__class__, Timestamp)
-    self.assertEqual((Timestamp(123) - 456).__class__, Timestamp)
-    self.assertEqual((Timestamp(123) - Duration(456)).__class__, Timestamp)
-    self.assertEqual((Timestamp(1230) % 456).__class__, Duration)
-    self.assertEqual((Timestamp(1230) % Duration(456)).__class__, Duration)
-
-    # Unsupported operations.
-    with self.assertRaises(TypeError):
-      self.assertEqual(Timestamp(123) * 456, 56088)
-    with self.assertRaises(TypeError):
-      self.assertEqual(Timestamp(123) * Duration(456), 56088)
-    with self.assertRaises(TypeError):
-      self.assertEqual(456 * Timestamp(123), 56088)
-    with self.assertRaises(TypeError):
-      self.assertEqual(Duration(456) * Timestamp(123), 56088)
-    with self.assertRaises(TypeError):
-      self.assertEqual(456 - Timestamp(123), 333)
-    with self.assertRaises(TypeError):
-      self.assertEqual(Duration(456) - Timestamp(123), 333)
-    with self.assertRaises(TypeError):
-      self.assertEqual(-Timestamp(123), -123)
-    with self.assertRaises(TypeError):
-      self.assertEqual(-Timestamp(123), -Duration(123))
-    with self.assertRaises(TypeError):
-      self.assertEqual(1230 % Timestamp(456), 318)
-    with self.assertRaises(TypeError):
-      self.assertEqual(Duration(1230) % Timestamp(456), 318)
-
-  def test_sort_order(self):
-    self.assertEqual(
-        [-63, Timestamp(-3), 2, 9, Timestamp(292.3), 500],
-        sorted([9, 2, Timestamp(-3), Timestamp(292.3), -63, 500]))
-    self.assertEqual(
-        [4, 5, Timestamp(6), Timestamp(7), 8, 9],
-        sorted([9, 8, Timestamp(7), Timestamp(6), 5, 4]))
-
-  def test_str(self):
-    self.assertEqual('Timestamp(1.234567)',
-                     str(Timestamp(1.234567)))
-    self.assertEqual('Timestamp(-1.234567)',
-                     str(Timestamp(-1.234567)))
-    self.assertEqual('Timestamp(-999999999.900000)',
-                     str(Timestamp(-999999999.9)))
-    self.assertEqual('Timestamp(999999999)',
-                     str(Timestamp(999999999)))
-    self.assertEqual('Timestamp(-999999999)',
-                     str(Timestamp(-999999999)))
-
-
-class DurationTest(unittest.TestCase):
-
-  def test_of(self):
-    interval = Duration(123)
-    self.assertEqual(id(interval), id(Duration.of(interval)))
-    self.assertEqual(interval, Duration.of(123.0))
-    with self.assertRaises(TypeError):
-      Duration.of(Timestamp(10))
-
-  def test_precision(self):
-    self.assertEqual(Duration(10000000) % 0.1, 0)
-    self.assertEqual(Duration(10000000) % 0.05, 0)
-    self.assertEqual(Duration(10000000) % 0.000005, 0)
-
-  def test_arithmetic(self):
-    self.assertEqual(Duration(123) + 456, 579)
-    self.assertEqual(456 + Duration(123), 579)
-    self.assertEqual(Duration(123) * 456, 56088)
-    self.assertEqual(456 * Duration(123), 56088)
-    self.assertEqual(Duration(123) - 456, -333)
-    self.assertEqual(456 - Duration(123), 333)
-    self.assertEqual(-Duration(123), -123)
-
-  def test_sort_order(self):
-    self.assertEqual(
-        [-63, Duration(-3), 2, 9, Duration(292.3), 500],
-        sorted([9, 2, Duration(-3), Duration(292.3), -63, 500]))
-    self.assertEqual(
-        [4, 5, Duration(6), Duration(7), 8, 9],
-        sorted([9, 8, Duration(7), Duration(6), 5, 4]))
-
-  def test_str(self):
-    self.assertEqual('Duration(1.234567)',
-                     str(Duration(1.234567)))
-    self.assertEqual('Duration(-1.234567)',
-                     str(Duration(-1.234567)))
-    self.assertEqual('Duration(-999999999.900000)',
-                     str(Duration(-999999999.9)))
-    self.assertEqual('Duration(999999999)',
-                     str(Duration(999999999)))
-    self.assertEqual('Duration(-999999999)',
-                     str(Duration(-999999999)))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/trigger.py b/sdks/python/google/cloud/dataflow/transforms/trigger.py
deleted file mode 100644
index 039847a..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/trigger.py
+++ /dev/null
@@ -1,958 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Support for Dataflow triggers.
-
-Triggers control when in processing time windows get emitted.
-"""
-
-from abc import ABCMeta
-from abc import abstractmethod
-import collections
-import copy
-
-from google.cloud.dataflow.coders import observable
-from google.cloud.dataflow.transforms import combiners
-from google.cloud.dataflow.transforms import core
-from google.cloud.dataflow.transforms.timeutil import MAX_TIMESTAMP
-from google.cloud.dataflow.transforms.timeutil import MIN_TIMESTAMP
-from google.cloud.dataflow.transforms.timeutil import TimeDomain
-from google.cloud.dataflow.transforms.window import GlobalWindow
-from google.cloud.dataflow.transforms.window import OutputTimeFn
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.transforms.window import WindowFn
-
-
-class AccumulationMode(object):
-  """Controls what to do with data when a trigger fires multiple times.
-  """
-  DISCARDING = 1
-  ACCUMULATING = 2
-  # TODO(robertwb): Provide retractions of previous outputs.
-  # RETRACTING = 3
-
-
-class StateTag(object):
-  """An identifier used to store and retrieve typed, combinable state.
-
-  The given tag must be unique for this stage.  If CombineFn is None then
-  all elements will be returned as a list, otherwise the given CombineFn
-  will be applied (possibly incrementally and eagerly) when adding elements.
-  """
-  __metaclass__ = ABCMeta
-
-  def __init__(self, tag):
-    self.tag = tag
-
-
-class ValueStateTag(StateTag):
-  """StateTag pointing to an element."""
-
-  def __repr__(self):
-    return 'ValueStateTag(%s, %s)' % (self.tag, self.combine_fn)
-
-  def with_prefix(self, prefix):
-    return ValueStateTag(prefix + self.tag)
-
-
-class CombiningValueStateTag(StateTag):
-  """StateTag pointing to an element, accumulated with a combiner."""
-
-  # TODO(robertwb): Also store the coder (perhaps extracted from the combine_fn)
-  def __init__(self, tag, combine_fn):
-    super(CombiningValueStateTag, self).__init__(tag)
-    if not combine_fn:
-      raise ValueError('combine_fn must be specified.')
-    if not isinstance(combine_fn, core.CombineFn):
-      combine_fn = core.CombineFn.from_callable(combine_fn)
-    self.combine_fn = combine_fn
-
-  def __repr__(self):
-    return 'CombiningValueStateTag(%s, %s)' % (self.tag, self.combine_fn)
-
-  def with_prefix(self, prefix):
-    return CombiningValueStateTag(prefix + self.tag, self.combine_fn)
-
-
-class ListStateTag(StateTag):
-  """StateTag pointing to a list of elements."""
-
-  def __init__(self, tag):
-    super(ListStateTag, self).__init__(tag)
-
-  def __repr__(self):
-    return 'ListStateTag(%s)' % self.tag
-
-  def with_prefix(self, prefix):
-    return ListStateTag(prefix + self.tag)
-
-
-class WatermarkHoldStateTag(StateTag):
-
-  def __init__(self, tag, output_time_fn_impl):
-    super(WatermarkHoldStateTag, self).__init__(tag)
-    self.output_time_fn_impl = output_time_fn_impl
-
-  def __repr__(self):
-    return 'WatermarkHoldStateTag(%s, %s)' % (self.tag,
-                                              self.output_time_fn_impl)
-
-  def with_prefix(self, prefix):
-    return WatermarkHoldStateTag(prefix + self.tag,
-                                 self.output_time_fn_impl)
-
-
-# pylint: disable=unused-argument
-# TODO(robertwb): Provisional API, Java likely to change as well.
-class TriggerFn(object):
-  """A TriggerFn determines when window (panes) are emitted.
-
-  See https://cloud.google.com/dataflow/model/triggers.
-  """
-  __metaclass__ = ABCMeta
-
-  @abstractmethod
-  def on_element(self, element, window, context):
-    """Called when a new element arrives in a window.
-
-    Args:
-      element: the element being added
-      window: the window to which the element is being added
-      context: a context (e.g. a TriggerContext instance) for managing state
-          and setting timers
-    """
-    pass
-
-  @abstractmethod
-  def on_merge(self, to_be_merged, merge_result, context):
-    """Called when multiple windows are merged.
-
-    Args:
-      to_be_merged: the set of windows to be merged
-      merge_result: the window into which the windows are being merged
-      context: a context (e.g. a TriggerContext instance) for managing state
-          and setting timers
-    """
-    pass
-
-  @abstractmethod
-  def should_fire(self, watermark, window, context):
-    """Whether this trigger should cause the window to fire.
-
-    Args:
-      watermark: (a lower bound on) the watermark of the system
-      window: the window whose trigger is being considered
-      context: a context (e.g. a TriggerContext instance) for managing state
-          and setting timers
-
-    Returns:
-      whether this trigger should cause a firing
-    """
-    pass
-
-  @abstractmethod
-  def on_fire(self, watermark, window, context):
-    """Called when a trigger actually fires.
-
-    Args:
-      watermark: (a lower bound on) the watermark of the system
-      window: the window whose trigger is being fired
-      context: a context (e.g. a TriggerContext instance) for managing state
-          and setting timers
-
-    Returns:
-      whether this trigger is finished
-    """
-    pass
-
-  @abstractmethod
-  def reset(self, window, context):
-    """Clear any state and timers used by this TriggerFn."""
-    pass
-# pylint: enable=unused-argument
-
-
-class DefaultTrigger(TriggerFn):
-  """Semantically Repeatedly(AfterWatermark()), but more optimized."""
-
-  def __init__(self):
-    pass
-
-  def __repr__(self):
-    return 'DefaultTrigger()'
-
-  def on_element(self, element, window, context):
-    context.set_timer('', TimeDomain.WATERMARK, window.end)
-
-  def on_merge(self, to_be_merged, merge_result, context):
-    # Note: Timer clearing solely an optimization.
-    for window in to_be_merged:
-      if window.end != merge_result.end:
-        context.clear_timer('', TimeDomain.WATERMARK)
-
-  def should_fire(self, watermark, window, context):
-    return watermark >= window.end
-
-  def on_fire(self, watermark, window, context):
-    return False
-
-  def reset(self, window, context):
-    context.clear_timer('', TimeDomain.WATERMARK)
-
-  def __eq__(self, other):
-    return type(self) == type(other)
-
-
-class AfterWatermark(TriggerFn):
-  """Fire exactly once when the watermark passes the end of the window.
-
-  Args:
-      early: if not None, a speculative trigger to repeatedly evaluate before
-        the watermark passes the end of the window
-      late: if not None, a speculative trigger to repeatedly evaluate after
-        the watermark passes the end of the window
-  """
-  LATE_TAG = CombiningValueStateTag('is_late', any)
-
-  def __init__(self, early=None, late=None):
-    self.early = Repeatedly(early) if early else None
-    self.late = Repeatedly(late) if late else None
-
-  def __repr__(self):
-    qualifiers = []
-    if self.early:
-      qualifiers.append('early=%s' % self.early)
-    if self.late:
-      qualifiers.append('late=%s', self.late)
-    return 'AfterWatermark(%s)' % ', '.join(qualifiers)
-
-  def is_late(self, context):
-    return self.late and context.get_state(self.LATE_TAG)
-
-  def on_element(self, element, window, context):
-    if self.is_late(context):
-      self.late.on_element(element, window, NestedContext(context, 'late'))
-    else:
-      context.set_timer('', TimeDomain.WATERMARK, window.end)
-      if self.early:
-        self.early.on_element(element, window, NestedContext(context, 'early'))
-
-  def on_merge(self, to_be_merged, merge_result, context):
-    # TODO(robertwb): Figure out whether the 'rewind' semantics could be used
-    # here.
-    if self.is_late(context):
-      self.late.on_merge(
-          to_be_merged, merge_result, NestedContext(context, 'late'))
-    else:
-      # Note: Timer clearing solely an optimization.
-      for window in to_be_merged:
-        if window.end != merge_result.end:
-          context.clear_timer('', TimeDomain.WATERMARK)
-      if self.early:
-        self.early.on_merge(
-            to_be_merged, merge_result, NestedContext(context, 'early'))
-
-  def should_fire(self, watermark, window, context):
-    if self.is_late(context):
-      return self.late.should_fire(
-          watermark, window, NestedContext(context, 'late'))
-    elif watermark >= window.end:
-      return True
-    elif self.early:
-      return self.early.should_fire(
-          watermark, window, NestedContext(context, 'early'))
-    else:
-      return False
-
-  def on_fire(self, watermark, window, context):
-    if self.is_late(context):
-      return self.late.on_fire(
-          watermark, window, NestedContext(context, 'late'))
-    elif watermark >= window.end:
-      context.add_state(self.LATE_TAG, True)
-      return not self.late
-    elif self.early:
-      self.early.on_fire(watermark, window, NestedContext(context, 'early'))
-      return False
-
-  def reset(self, window, context):
-    if self.late:
-      context.clear_state(self.LATE_TAG)
-    if self.early:
-      self.early.reset(window, NestedContext(context, 'early'))
-    if self.late:
-      self.late.reset(window, NestedContext(context, 'late'))
-
-  def __eq__(self, other):
-    return (type(self) == type(other)
-            and self.early == other.early
-            and self.late == other.late)
-
-  def __hash__(self):
-    return hash((type(self), self.early, self.late))
-
-
-class AfterCount(TriggerFn):
-  """Fire when there are at least count elements in this window pane."""
-
-  COUNT_TAG = CombiningValueStateTag('count', combiners.CountCombineFn())
-
-  def __init__(self, count):
-    self.count = count
-
-  def __repr__(self):
-    return 'AfterCount(%s)' % self.count
-
-  def on_element(self, element, window, context):
-    context.add_state(self.COUNT_TAG, 1)
-
-  def on_merge(self, to_be_merged, merge_result, context):
-    # states automatically merged
-    pass
-
-  def should_fire(self, watermark, window, context):
-    return context.get_state(self.COUNT_TAG) >= self.count
-
-  def on_fire(self, watermark, window, context):
-    return True
-
-  def reset(self, window, context):
-    context.clear_state(self.COUNT_TAG)
-
-
-class Repeatedly(TriggerFn):
-  """Repeatedly invoke the given trigger, never finishing."""
-
-  def __init__(self, underlying):
-    self.underlying = underlying
-
-  def __repr__(self):
-    return 'Repeatedly(%s)' % self.underlying
-
-  def on_element(self, element, window, context):  # get window from context?
-    self.underlying.on_element(element, window, context)
-
-  def on_merge(self, to_be_merged, merge_result, context):
-    self.underlying.on_merge(to_be_merged, merge_result, context)
-
-  def should_fire(self, watermark, window, context):
-    return self.underlying.should_fire(watermark, window, context)
-
-  def on_fire(self, watermark, window, context):
-    if self.underlying.on_fire(watermark, window, context):
-      self.underlying.reset(window, context)
-    return False
-
-  def reset(self, window, context):
-    self.underlying.reset(window, context)
-
-
-class ParallelTriggerFn(TriggerFn):
-
-  __metaclass__ = ABCMeta
-
-  def __init__(self, *triggers):
-    self.triggers = triggers
-
-  def __repr__(self):
-    return '%s(%s)' % (self.__class__.__name__,
-                       ', '.join(str(t) for t in self.triggers))
-
-  @abstractmethod
-  def combine_op(self, trigger_results):
-    pass
-
-  def on_element(self, element, window, context):
-    for ix, trigger in enumerate(self.triggers):
-      trigger.on_element(element, window, self._sub_context(context, ix))
-
-  def on_merge(self, to_be_merged, merge_result, context):
-    for ix, trigger in enumerate(self.triggers):
-      trigger.on_merge(
-          to_be_merged, merge_result, self._sub_context(context, ix))
-
-  def should_fire(self, watermark, window, context):
-    return self.combine_op(
-        trigger.should_fire(watermark, window, self._sub_context(context, ix))
-        for ix, trigger in enumerate(self.triggers))
-
-  def on_fire(self, watermark, window, context):
-    finished = []
-    for ix, trigger in enumerate(self.triggers):
-      nested_context = self._sub_context(context, ix)
-      if trigger.should_fire(watermark, window, nested_context):
-        finished.append(trigger.on_fire(watermark, window, nested_context))
-    return self.combine_op(finished)
-
-  def reset(self, window, context):
-    for ix, trigger in enumerate(self.triggers):
-      trigger.reset(window, self._sub_context(context, ix))
-
-  @staticmethod
-  def _sub_context(context, index):
-    return NestedContext(context, '%d/' % index)
-
-
-class AfterFirst(ParallelTriggerFn):
-  """Fires when any subtrigger fires.
-
-  Also finishes when any subtrigger finishes.
-  """
-  combine_op = any
-
-
-class AfterAll(ParallelTriggerFn):
-  """Fires when all subtriggers have fired.
-
-  Also finishes when all subtriggers have finished.
-  """
-  combine_op = all
-
-
-class AfterEach(TriggerFn):
-
-  INDEX_TAG = CombiningValueStateTag('index', (
-      lambda indices: 0 if not indices else max(indices)))
-
-  def __init__(self, *triggers):
-    self.triggers = triggers
-
-  def __repr__(self):
-    return '%s(%s)' % (self.__class__.__name__,
-                       ', '.join(str(t) for t in self.triggers))
-
-  def on_element(self, element, window, context):
-    ix = context.get_state(self.INDEX_TAG)
-    if ix < len(self.triggers):
-      self.triggers[ix].on_element(
-          element, window, self._sub_context(context, ix))
-
-  def on_merge(self, to_be_merged, merge_result, context):
-    # This takes the furthest window on merging.
-    # TODO(robertwb): Revisit this when merging windows logic is settled for
-    # all possible merging situations.
-    ix = context.get_state(self.INDEX_TAG)
-    if ix < len(self.triggers):
-      self.triggers[ix].on_merge(
-          to_be_merged, merge_result, self._sub_context(context, ix))
-
-  def should_fire(self, watermark, window, context):
-    ix = context.get_state(self.INDEX_TAG)
-    if ix < len(self.triggers):
-      return self.triggers[ix].should_fire(
-          watermark, window, self._sub_context(context, ix))
-
-  def on_fire(self, watermark, window, context):
-    ix = context.get_state(self.INDEX_TAG)
-    if ix < len(self.triggers):
-      if self.triggers[ix].on_fire(
-          watermark, window, self._sub_context(context, ix)):
-        ix += 1
-        context.add_state(self.INDEX_TAG, ix)
-      return ix == len(self.triggers)
-
-  def reset(self, window, context):
-    context.clear_state(self.INDEX_TAG)
-    for ix, trigger in enumerate(self.triggers):
-      trigger.reset(window, self._sub_context(context, ix))
-
-  @staticmethod
-  def _sub_context(context, index):
-    return NestedContext(context, '%d/' % index)
-
-
-class OrFinally(AfterFirst):
-
-  def __init__(self, body_trigger, exit_trigger):
-    super(OrFinally, self).__init__(body_trigger, exit_trigger)
-
-
-class TriggerContext(object):
-
-  def __init__(self, outer, window):
-    self._outer = outer
-    self._window = window
-
-  def set_timer(self, name, time_domain, timestamp):
-    self._outer.set_timer(self._window, name, time_domain, timestamp)
-
-  def clear_timer(self, name, time_domain):
-    self._outer.clear_timer(self._window, name, time_domain)
-
-  def add_state(self, tag, value):
-    self._outer.add_state(self._window, tag, value)
-
-  def get_state(self, tag):
-    return self._outer.get_state(self._window, tag)
-
-  def clear_state(self, tag):
-    return self._outer.clear_state(self._window, tag)
-
-
-class NestedContext(object):
-  """Namespaced context useful for defining composite triggers."""
-
-  def __init__(self, outer, prefix):
-    self._outer = outer
-    self._prefix = prefix
-
-  def set_timer(self, name, time_domain, timestamp):
-    self._outer.set_timer(self._prefix + name, time_domain, timestamp)
-
-  def clear_timer(self, name, time_domain):
-    self._outer.clear_timer(self._prefix + name, time_domain)
-
-  def add_state(self, tag, value):
-    self._outer.add_state(tag.with_prefix(self._prefix), value)
-
-  def get_state(self, tag):
-    return self._outer.get_state(tag.with_prefix(self._prefix))
-
-  def clear_state(self, tag):
-    self._outer.clear_state(tag.with_prefix(self._prefix))
-
-
-# pylint: disable=unused-argument
-class SimpleState(object):
-  """Basic state storage interface used for triggering.
-
-  Only timers must hold the watermark (by their timestamp).
-  """
-
-  __metaclass__ = ABCMeta
-
-  @abstractmethod
-  def set_timer(self, window, name, time_domain, timestamp):
-    pass
-
-  @abstractmethod
-  def get_window(self, window_id):
-    pass
-
-  @abstractmethod
-  def clear_timer(self, window, name, time_domain):
-    pass
-
-  @abstractmethod
-  def add_state(self, window, tag, value):
-    pass
-
-  @abstractmethod
-  def get_state(self, window, tag):
-    pass
-
-  @abstractmethod
-  def clear_state(self, window, tag):
-    pass
-
-  def at(self, window):
-    return TriggerContext(self, window)
-
-
-class UnmergedState(SimpleState):
-  """State suitable for use in TriggerDriver.
-
-  This class must be implemented by each backend.
-  """
-
-  @abstractmethod
-  def set_global_state(self, tag, value):
-    pass
-
-  @abstractmethod
-  def get_global_state(self, tag, default=None):
-    pass
-# pylint: enable=unused-argument
-
-
-class MergeableStateAdapter(SimpleState):
-  """Wraps an UnmergedState, tracking merged windows."""
-  # TODO(robertwb): A similar indirection could be used for sliding windows
-  # or other window_fns when a single element typically belongs to many windows.
-
-  WINDOW_IDS = ValueStateTag('window_ids')
-
-  def __init__(self, raw_state):
-    self.raw_state = raw_state
-    self.window_ids = self.raw_state.get_global_state(self.WINDOW_IDS, {})
-    self.counter = None
-
-  def set_timer(self, window, name, time_domain, timestamp):
-    self.raw_state.set_timer(self._get_id(window), name, time_domain, timestamp)
-
-  def clear_timer(self, window, name, time_domain):
-    for window_id in self._get_ids(window):
-      self.raw_state.clear_timer(window_id, name, time_domain)
-
-  def add_state(self, window, tag, value):
-    if isinstance(tag, ValueStateTag):
-      raise ValueError(
-          'Merging requested for non-mergeable state tag: %r.' % tag)
-    self.raw_state.add_state(self._get_id(window), tag, value)
-
-  def get_state(self, window, tag):
-    values = [self.raw_state.get_state(window_id, tag)
-              for window_id in self._get_ids(window)]
-    if isinstance(tag, ValueStateTag):
-      raise ValueError(
-          'Merging requested for non-mergeable state tag: %r.' % tag)
-    elif isinstance(tag, CombiningValueStateTag):
-      # TODO(robertwb): Strip combine_fn.extract_output from raw_state tag.
-      if not values:
-        accumulator = tag.combine_fn.create_accumulator()
-      elif len(values) == 1:
-        accumulator = values[0]
-      else:
-        accumulator = tag.combine_fn.merge_accumulators(values)
-        # TODO(robertwb): Store the merged value in the first tag.
-      return tag.combine_fn.extract_output(accumulator)
-    elif isinstance(tag, ListStateTag):
-      return [v for vs in values for v in vs]
-    elif isinstance(tag, WatermarkHoldStateTag):
-      return tag.output_time_fn_impl.combine_all(values)
-    else:
-      raise ValueError('Invalid tag.', tag)
-
-  def clear_state(self, window, tag):
-    for window_id in self._get_ids(window):
-      self.raw_state.clear_state(window_id, tag)
-    if tag is None:
-      del self.window_ids[window]
-      self._persist_window_ids()
-
-  def merge(self, to_be_merged, merge_result):
-    for window in to_be_merged:
-      if window != merge_result:
-        if window in self.window_ids:
-          if merge_result in self.window_ids:
-            merge_window_ids = self.window_ids[merge_result]
-          else:
-            merge_window_ids = self.window_ids[merge_result] = []
-          merge_window_ids.extend(self.window_ids.pop(window))
-          self._persist_window_ids()
-
-  def known_windows(self):
-    return self.window_ids.keys()
-
-  def get_window(self, window_id):
-    for window, ids in self.window_ids.items():
-      if window_id in ids:
-        return window
-    raise ValueError('No window for %s' % window_id)
-
-  def _get_id(self, window):
-    if window in self.window_ids:
-      return self.window_ids[window][0]
-    else:
-      window_id = self._get_next_counter()
-      self.window_ids[window] = [window_id]
-      self._persist_window_ids()
-      return window_id
-
-  def _get_ids(self, window):
-    return self.window_ids.get(window, [])
-
-  def _get_next_counter(self):
-    if not self.window_ids:
-      self.counter = 0
-    elif self.counter is None:
-      self.counter = max(k for ids in self.window_ids.values() for k in ids)
-    self.counter += 1
-    return self.counter
-
-  def _persist_window_ids(self):
-    self.raw_state.set_global_state(self.WINDOW_IDS, self.window_ids)
-
-  def __repr__(self):
-    return '\n\t'.join([repr(self.window_ids)] +
-                       repr(self.raw_state).split('\n'))
-
-
-def create_trigger_driver(windowing, is_batch=False, phased_combine_fn=None):
-  """Create the TriggerDriver for the given windowing and options."""
-
-  # TODO(robertwb): We can do more if we know elements are in timestamp
-  # sorted order.
-  if windowing.is_default() and is_batch:
-    driver = DefaultGlobalBatchTriggerDriver()
-  else:
-    driver = GeneralTriggerDriver(windowing)
-
-  if phased_combine_fn:
-    # TODO(ccy): Refactor GeneralTriggerDriver to combine values eagerly using
-    # the known phased_combine_fn here.
-    driver = CombiningTriggerDriver(phased_combine_fn, driver)
-  return driver
-
-
-class TriggerDriver(object):
-  """Breaks a series of bundle and timer firings into window (pane)s."""
-
-  __metaclass__ = ABCMeta
-
-  @abstractmethod
-  def process_elements(self, state, windowed_values, output_watermark):
-    pass
-
-  @abstractmethod
-  def process_timer(self, window_id, name, time_domain, timestamp, state):
-    pass
-
-
-class DefaultGlobalBatchTriggerDriver(TriggerDriver):
-  """Breaks a bundles into window (pane)s according to the default triggering.
-  """
-  GLOBAL_WINDOW_TUPLE = (GlobalWindow(),)
-
-  def __init__(self):
-    pass
-
-  def process_elements(self, state, windowed_values, unused_output_watermark):
-    if isinstance(windowed_values, list):
-      unwindowed = [wv.value for wv in windowed_values]
-    else:
-      class UnwindowedValues(observable.ObservableMixin):
-        def __iter__(self):
-          for wv in windowed_values:
-            unwindowed_value = wv.value
-            self.notify_observers(unwindowed_value)
-            yield unwindowed_value
-        def __repr__(self):
-          return '<UnwindowedValues of %s>' % windowed_values
-      unwindowed = UnwindowedValues()
-    yield WindowedValue(unwindowed, MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE)
-
-  def process_timer(self, window_id, name, time_domain, timestamp, state):
-    raise TypeError('Triggers never set or called for batch default windowing.')
-
-
-class CombiningTriggerDriver(TriggerDriver):
-  """Uses a phased_combine_fn to process output of wrapped TriggerDriver."""
-
-  def __init__(self, phased_combine_fn, underlying):
-    self.phased_combine_fn = phased_combine_fn
-    self.underlying = underlying
-
-  def process_elements(self, state, windowed_values, output_watermark):
-    uncombined = self.underlying.process_elements(state, windowed_values,
-                                                  output_watermark)
-    for output in uncombined:
-      yield output.with_value(self.phased_combine_fn.apply(output.value))
-
-  def process_timer(self, window_id, name, time_domain, timestamp, state):
-    uncombined = self.underlying.process_timer(window_id, name, time_domain,
-                                               timestamp, state)
-    for output in uncombined:
-      yield output.with_value(self.phased_combine_fn.apply(output.value))
-
-
-class GeneralTriggerDriver(TriggerDriver):
-  """Breaks a series of bundle and timer firings into window (pane)s.
-
-  Suitable for all variants of Windowing.
-  """
-  ELEMENTS = ListStateTag('elements')
-  TOMBSTONE = CombiningValueStateTag('tombstone', combiners.CountCombineFn())
-
-  def __init__(self, windowing):
-    self.window_fn = windowing.windowfn
-    self.output_time_fn_impl = OutputTimeFn.get_impl(windowing.output_time_fn,
-                                                     self.window_fn)
-    # pylint: disable=invalid-name
-    self.WATERMARK_HOLD = WatermarkHoldStateTag('watermark',
-                                                self.output_time_fn_impl)
-    # pylint: enable=invalid-name
-    self.trigger_fn = windowing.triggerfn
-    self.accumulation_mode = windowing.accumulation_mode
-    self.is_merging = True
-
-  def process_elements(self, state, windowed_values, output_watermark):
-    if self.is_merging:
-      state = MergeableStateAdapter(state)
-
-    windows_to_elements = collections.defaultdict(list)
-    for wv in windowed_values:
-      for window in wv.windows:
-        windows_to_elements[window].append((wv.value, wv.timestamp))
-
-    # First handle merging.
-    if self.is_merging:
-      old_windows = set(state.known_windows())
-      all_windows = old_windows.union(windows_to_elements.keys())
-
-      if all_windows != old_windows:
-        merged_away = {}
-
-        class TriggerMergeContext(WindowFn.MergeContext):
-
-          def merge(_, to_be_merged, merge_result):
-            for window in to_be_merged:
-              if window != merge_result:
-                merged_away[window] = merge_result
-            state.merge(to_be_merged, merge_result)
-            self.trigger_fn.on_merge(
-                to_be_merged, merge_result, state.at(merge_result))
-
-        self.window_fn.merge(TriggerMergeContext(all_windows))
-
-        merged_windows_to_elements = collections.defaultdict(list)
-        for window, values in windows_to_elements.items():
-          while window in merged_away:
-            window = merged_away[window]
-          merged_windows_to_elements[window].extend(values)
-        windows_to_elements = merged_windows_to_elements
-
-        for window in merged_away:
-          state.clear_state(window, self.WATERMARK_HOLD)
-
-    # Next handle element adding.
-    for window, elements in windows_to_elements.items():
-      if state.get_state(window, self.TOMBSTONE):
-        continue
-      # Add watermark hold.
-      # TODO(ccy): Add late data and garbage-collection hold support.
-      output_time = self.output_time_fn_impl.merge(
-          window,
-          (element_output_time for element_output_time in
-           (self.output_time_fn_impl.assign_output_time(window, timestamp)
-            for unused_value, timestamp in elements)
-           if element_output_time >= output_watermark))
-      if output_time is not None:
-        state.add_state(window, self.WATERMARK_HOLD, output_time)
-
-      context = state.at(window)
-      for value, unused_timestamp in elements:
-        state.add_state(window, self.ELEMENTS, value)
-        self.trigger_fn.on_element(value, window, context)
-
-      # Maybe fire this window.
-      watermark = MIN_TIMESTAMP
-      if self.trigger_fn.should_fire(watermark, window, context):
-        finished = self.trigger_fn.on_fire(watermark, window, context)
-        yield self._output(window, finished, state)
-
-  def process_timer(self, window_id, unused_name, time_domain, timestamp,
-                    state):
-    if self.is_merging:
-      state = MergeableStateAdapter(state)
-    window = state.get_window(window_id)
-    if state.get_state(window, self.TOMBSTONE):
-      return
-    if time_domain == TimeDomain.WATERMARK:
-      if not self.is_merging or window in state.known_windows():
-        context = state.at(window)
-        if self.trigger_fn.should_fire(timestamp, window, context):
-          finished = self.trigger_fn.on_fire(timestamp, window, context)
-          yield self._output(window, finished, state)
-    else:
-      raise Exception('Unexpected time domain: %s' % time_domain)
-
-  def _output(self, window, finished, state):
-    """Output window and clean up if appropriate."""
-
-    values = state.get_state(window, self.ELEMENTS)
-    if finished:
-      # TODO(robertwb): allowed lateness
-      state.clear_state(window, self.ELEMENTS)
-      state.add_state(window, self.TOMBSTONE, 1)
-    elif self.accumulation_mode == AccumulationMode.DISCARDING:
-      state.clear_state(window, self.ELEMENTS)
-
-    timestamp = state.get_state(window, self.WATERMARK_HOLD)
-    if timestamp is None:
-      # If no watermark hold was set, output at end of window.
-      timestamp = window.end
-    else:
-      state.clear_state(window, self.WATERMARK_HOLD)
-
-    return WindowedValue(values, timestamp, (window,))
-
-
-class InMemoryUnmergedState(UnmergedState):
-  """In-memory implementation of UnmergedState.
-
-  Used for batch and testing.
-  """
-  def __init__(self, defensive_copy=True):
-    # TODO(robertwb): Skip defensive_copy in production if it's too expensive.
-    self.timers = collections.defaultdict(dict)
-    self.state = collections.defaultdict(lambda: collections.defaultdict(list))
-    self.global_state = {}
-    self.defensive_copy = defensive_copy
-
-  def set_global_state(self, tag, value):
-    assert isinstance(tag, ValueStateTag)
-    if self.defensive_copy:
-      value = copy.deepcopy(value)
-    self.global_state[tag.tag] = value
-
-  def get_global_state(self, tag, default=None):
-    return self.global_state.get(tag.tag, default)
-
-  def set_timer(self, window, name, time_domain, timestamp):
-    self.timers[window][(name, time_domain)] = timestamp
-
-  def clear_timer(self, window, name, time_domain):
-    self.timers[window].pop((name, time_domain), None)
-
-  def get_window(self, window_id):
-    return window_id
-
-  def add_state(self, window, tag, value):
-    if self.defensive_copy:
-      value = copy.deepcopy(value)
-    if isinstance(tag, ValueStateTag):
-      self.state[window][tag.tag] = value
-    elif isinstance(tag, CombiningValueStateTag):
-      self.state[window][tag.tag].append(value)
-    elif isinstance(tag, ListStateTag):
-      self.state[window][tag.tag].append(value)
-    elif isinstance(tag, WatermarkHoldStateTag):
-      self.state[window][tag.tag].append(value)
-    else:
-      raise ValueError('Invalid tag.', tag)
-
-  def get_state(self, window, tag):
-    values = self.state[window][tag.tag]
-    if isinstance(tag, ValueStateTag):
-      return values
-    elif isinstance(tag, CombiningValueStateTag):
-      return tag.combine_fn.apply(values)
-    elif isinstance(tag, ListStateTag):
-      return values
-    elif isinstance(tag, WatermarkHoldStateTag):
-      return tag.output_time_fn_impl.combine_all(values)
-    else:
-      raise ValueError('Invalid tag.', tag)
-
-  def clear_state(self, window, tag):
-    self.state[window].pop(tag.tag, None)
-    if not self.state[window]:
-      self.state.pop(window, None)
-
-  def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
-    expired = []
-    for window, timers in list(self.timers.items()):
-      for (name, time_domain), timestamp in list(timers.items()):
-        if timestamp <= watermark:
-          expired.append((window, (name, time_domain, timestamp)))
-          del timers[(name, time_domain)]
-      if not timers:
-        del self.timers[window]
-    return expired
-
-  def __repr__(self):
-    state_str = '\n'.join('%s: %s' % (key, dict(state))
-                          for key, state in self.state.items())
-    return 'timers: %s\nstate: %s' % (dict(self.timers), state_str)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/trigger_test.py b/sdks/python/google/cloud/dataflow/transforms/trigger_test.py
deleted file mode 100644
index 9aca3bb..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/trigger_test.py
+++ /dev/null
@@ -1,566 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the triggering classes."""
-
-import collections
-import os.path
-import unittest
-
-import yaml
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms.core import Windowing
-from google.cloud.dataflow.transforms.trigger import AccumulationMode
-from google.cloud.dataflow.transforms.trigger import AfterAll
-from google.cloud.dataflow.transforms.trigger import AfterCount
-from google.cloud.dataflow.transforms.trigger import AfterEach
-from google.cloud.dataflow.transforms.trigger import AfterFirst
-from google.cloud.dataflow.transforms.trigger import AfterWatermark
-from google.cloud.dataflow.transforms.trigger import DefaultTrigger
-from google.cloud.dataflow.transforms.trigger import GeneralTriggerDriver
-from google.cloud.dataflow.transforms.trigger import InMemoryUnmergedState
-from google.cloud.dataflow.transforms.trigger import Repeatedly
-from google.cloud.dataflow.transforms.util import assert_that, equal_to
-from google.cloud.dataflow.transforms.window import FixedWindows
-from google.cloud.dataflow.transforms.window import IntervalWindow
-from google.cloud.dataflow.transforms.window import MIN_TIMESTAMP
-from google.cloud.dataflow.transforms.window import OutputTimeFn
-from google.cloud.dataflow.transforms.window import Sessions
-from google.cloud.dataflow.transforms.window import TimestampedValue
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.transforms.window import WindowFn
-
-
-class CustomTimestampingFixedWindowsWindowFn(FixedWindows):
-  """WindowFn for testing custom timestamping."""
-
-  def get_transformed_output_time(self, unused_window, input_timestamp):
-    return input_timestamp + 100
-
-
-class TriggerTest(unittest.TestCase):
-
-  def run_trigger_simple(self, window_fn, trigger_fn, accumulation_mode,
-                         timestamped_data, expected_panes, *groupings,
-                         **kwargs):
-    late_data = kwargs.pop('late_data', [])
-    assert not kwargs
-    def bundle_data(data, size):
-      bundle = []
-      for timestamp, elem in data:
-        windows = window_fn.assign(WindowFn.AssignContext(timestamp, elem))
-        bundle.append(WindowedValue(elem, timestamp, windows))
-        if len(bundle) == size:
-          yield bundle
-          bundle = []
-      if bundle:
-        yield bundle
-
-    if not groupings:
-      groupings = [1]
-    for group_by in groupings:
-      bundles = []
-      bundle = []
-      for timestamp, elem in timestamped_data:
-        windows = window_fn.assign(WindowFn.AssignContext(timestamp, elem))
-        bundle.append(WindowedValue(elem, timestamp, windows))
-        if len(bundle) == group_by:
-          bundles.append(bundle)
-          bundle = []
-      bundles.append(bundle)
-      self.run_trigger(window_fn, trigger_fn, accumulation_mode,
-                       bundle_data(timestamped_data, group_by),
-                       bundle_data(late_data, group_by),
-                       expected_panes)
-
-  def run_trigger(self, window_fn, trigger_fn, accumulation_mode,
-                  bundles, late_bundles,
-                  expected_panes):
-    actual_panes = collections.defaultdict(list)
-    driver = GeneralTriggerDriver(
-        Windowing(window_fn, trigger_fn, accumulation_mode))
-    state = InMemoryUnmergedState()
-
-    for bundle in bundles:
-      for wvalue in driver.process_elements(state, bundle, MIN_TIMESTAMP):
-        window, = wvalue.windows
-        actual_panes[window].append(set(wvalue.value))
-
-    while state.timers:
-      for timer_window, (name, time_domain, timestamp) in (
-          state.get_and_clear_timers()):
-        for wvalue in driver.process_timer(
-            timer_window, name, time_domain, timestamp, state):
-          window, = wvalue.windows
-          actual_panes[window].append(set(wvalue.value))
-
-    for bundle in late_bundles:
-      for wvalue in driver.process_elements(state, bundle, MIN_TIMESTAMP):
-        window, = wvalue.windows
-        actual_panes[window].append(set(wvalue.value))
-
-      while state.timers:
-        for timer_window, (name, time_domain, timestamp) in (
-            state.get_and_clear_timers()):
-          for wvalue in driver.process_timer(
-              timer_window, name, time_domain, timestamp, state):
-            window, = wvalue.windows
-            actual_panes[window].append(set(wvalue.value))
-
-    self.assertEqual(expected_panes, actual_panes)
-
-  def test_fixed_watermark(self):
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterWatermark(),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (13, 'c')],
-        {IntervalWindow(0, 10): [set('ab')],
-         IntervalWindow(10, 20): [set('c')]},
-        1,
-        2,
-        3)
-
-  def test_fixed_watermark_with_early(self):
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterWatermark(early=AfterCount(2)),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c')],
-        {IntervalWindow(0, 10): [set('ab'), set('abc')]},
-        2)
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterWatermark(early=AfterCount(2)),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c')],
-        {IntervalWindow(0, 10): [set('abc'), set('abc')]},
-        3)
-
-  def test_fixed_watermark_with_early_late(self):
-    self.run_trigger_simple(
-        FixedWindows(100),  # pyformat break
-        AfterWatermark(early=AfterCount(3),
-                       late=AfterCount(2)),
-        AccumulationMode.DISCARDING,
-        zip(range(9), 'abcdefghi'),
-        {IntervalWindow(0, 100): [
-            set('abcd'), set('efgh'),  # early
-            set('i'),                  # on time
-            set('vw'), set('xy')       # late
-            ]},
-        2,
-        late_data=zip(range(5), 'vwxyz'))
-
-  def test_sessions_watermark_with_early_late(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterWatermark(early=AfterCount(2),
-                       late=AfterCount(1)),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (15, 'b'), (7, 'c'), (30, 'd')],
-        {IntervalWindow(1, 25): [
-            set('abc'),                # early
-            set('abc'),                # on time
-            set('abcxy')               # late
-         ],
-         IntervalWindow(30, 40): [
-             set('d'),                  # on time
-         ],
-         IntervalWindow(1, 40): [
-             set('abcdxyz')             # late
-         ],
-        },
-        2,
-        late_data=[(1, 'x'), (2, 'y'), (21, 'z')])
-
-  def test_fixed_after_count(self):
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterCount(2),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c'), (11, 'z')],
-        {IntervalWindow(0, 10): [set('ab')]},
-        1,
-        2)
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterCount(2),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c'), (11, 'z')],
-        {IntervalWindow(0, 10): [set('abc')]},
-        3,
-        4)
-
-  def test_fixed_after_first(self):
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterFirst(AfterCount(2), AfterWatermark()),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c')],
-        {IntervalWindow(0, 10): [set('ab')]},
-        1,
-        2)
-    self.run_trigger_simple(
-        FixedWindows(10),  # pyformat break
-        AfterFirst(AfterCount(5), AfterWatermark()),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c')],
-        {IntervalWindow(0, 10): [set('abc')]},
-        1,
-        2,
-        late_data=[(1, 'x'), (2, 'y'), (3, 'z')])
-
-  def test_repeatedly_after_first(self):
-    self.run_trigger_simple(
-        FixedWindows(100),  # pyformat break
-        Repeatedly(AfterFirst(AfterCount(3), AfterWatermark())),
-        AccumulationMode.ACCUMULATING,
-        zip(range(7), 'abcdefg'),
-        {IntervalWindow(0, 100): [
-            set('abc'),
-            set('abcdef'),
-            set('abcdefg'),
-            set('abcdefgx'),
-            set('abcdefgxy'),
-            set('abcdefgxyz')]},
-        1,
-        late_data=zip(range(3), 'xyz'))
-
-  def test_sessions_after_all(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterAll(AfterCount(2), AfterWatermark()),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c')],
-        {IntervalWindow(1, 13): [set('abc')]},
-        1,
-        2)
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterAll(AfterCount(5), AfterWatermark()),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (3, 'c')],
-        {IntervalWindow(1, 13): [set('abcxy')]},
-        1,
-        2,
-        late_data=[(1, 'x'), (2, 'y'), (3, 'z')])
-
-  def test_sessions_default(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        DefaultTrigger(),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b')],
-        {IntervalWindow(1, 12): [set('ab')]},
-        1,
-        2)
-
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterWatermark(),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (15, 'c'), (16, 'd'), (30, 'z'), (9, 'e'),
-         (10, 'f'), (30, 'y')],
-        {IntervalWindow(1, 26): [set('abcdef')],
-         IntervalWindow(30, 40): [set('yz')]},
-        1,
-        2,
-        3,
-        4,
-        5,
-        6)
-
-  def test_sessions_watermark(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterWatermark(),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b')],
-        {IntervalWindow(1, 12): [set('ab')]},
-        1,
-        2)
-
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterWatermark(),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (2, 'b'), (15, 'c'), (16, 'd'), (30, 'z'), (9, 'e'),
-         (10, 'f'), (30, 'y')],
-        {IntervalWindow(1, 26): [set('abcdef')],
-         IntervalWindow(30, 40): [set('yz')]},
-        1,
-        2,
-        3,
-        4,
-        5,
-        6)
-
-  def test_sessions_after_count(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterCount(2),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (15, 'b'), (6, 'c'), (30, 's'), (31, 't'), (50, 'z'),
-         (50, 'y')],
-        {IntervalWindow(1, 25): [set('abc')],
-         IntervalWindow(30, 41): [set('st')],
-         IntervalWindow(50, 60): [set('yz')]},
-        1,
-        2,
-        3)
-
-  def test_sessions_repeatedly_after_count(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        Repeatedly(AfterCount(2)),
-        AccumulationMode.ACCUMULATING,
-        [(1, 'a'), (15, 'b'), (6, 'c'), (2, 'd'), (7, 'e')],
-        {IntervalWindow(1, 25): [set('abc'), set('abcde')]},
-        1,
-        3)
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        Repeatedly(AfterCount(2)),
-        AccumulationMode.DISCARDING,
-        [(1, 'a'), (15, 'b'), (6, 'c'), (2, 'd'), (7, 'e')],
-        {IntervalWindow(1, 25): [set('abc'), set('de')]},
-        1,
-        3)
-
-  def test_sessions_after_each(self):
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        AfterEach(AfterCount(2), AfterCount(3)),
-        AccumulationMode.ACCUMULATING,
-        zip(range(10), 'abcdefghij'),
-        {IntervalWindow(0, 11): [set('ab')],
-         IntervalWindow(0, 15): [set('abcdef')]},
-        2)
-
-    self.run_trigger_simple(
-        Sessions(10),  # pyformat break
-        Repeatedly(AfterEach(AfterCount(2), AfterCount(3))),
-        AccumulationMode.ACCUMULATING,
-        zip(range(10), 'abcdefghij'),
-        {IntervalWindow(0, 11): [set('ab')],
-         IntervalWindow(0, 15): [set('abcdef')],
-         IntervalWindow(0, 17): [set('abcdefgh')]},
-        2)
-
-
-class TriggerPipelineTest(unittest.TestCase):
-
-  def test_after_count(self):
-    p = Pipeline('DirectPipelineRunner')
-    result = (p
-              | df.Create([1, 2, 3, 4, 5, 10, 11])
-              | df.FlatMap(lambda t: [('A', t), ('B', t + 5)])
-              | df.Map(lambda (k, t): TimestampedValue((k, t), t))
-              | df.WindowInto(FixedWindows(10), trigger=AfterCount(3),
-                              accumulation_mode=AccumulationMode.DISCARDING)
-              | df.GroupByKey()
-              | df.Map(lambda (k, v): ('%s-%s' % (k, len(v)), set(v))))
-    assert_that(result, equal_to(
-        {
-            'A-5': {1, 2, 3, 4, 5},
-            # A-10, A-11 never emitted due to AfterCount(3) never firing.
-            'B-4': {6, 7, 8, 9},
-            'B-3': {10, 15, 16},
-        }.iteritems()))
-
-
-class TranscriptTest(unittest.TestCase):
-
-  # We must prepend an underscore to this name so that the open-source unittest
-  # runner does not execute this method directly as a test.
-  @classmethod
-  def _create_test(cls, spec):
-    counter = 0
-    name = spec.get('name', 'unnamed')
-    unique_name = 'test_' + name
-    while hasattr(cls, unique_name):
-      counter += 1
-      unique_name = 'test_%s_%d' % (name, counter)
-    setattr(cls, unique_name, lambda self: self._run_log_test(spec))
-
-  # We must prepend an underscore to this name so that the open-source unittest
-  # runner does not execute this method directly as a test.
-  @classmethod
-  def _create_tests(cls, transcript_filename):
-    for spec in yaml.load_all(open(transcript_filename)):
-      cls._create_test(spec)
-
-  def _run_log_test(self, spec):
-    if 'error' in spec:
-      self.assertRaisesRegexp(
-          AssertionError, spec['error'], self._run_log, spec)
-    else:
-      self._run_log(spec)
-
-  def _run_log(self, spec):
-
-    def parse_int_list(s):
-      """Parses strings like '[1, 2, 3]'."""
-      s = s.strip()
-      assert s[0] == '[' and s[-1] == ']', s
-      if not s[1:-1].strip():
-        return []
-      else:
-        return [int(x) for x in s[1:-1].split(',')]
-
-    def split_args(s):
-      """Splits 'a, b, [c, d]' into ['a', 'b', '[c, d]']."""
-      args = []
-      start = 0
-      depth = 0
-      for ix in xrange(len(s)):
-        c = s[ix]
-        if c in '({[':
-          depth += 1
-        elif c in ')}]':
-          depth -= 1
-        elif c == ',' and depth == 0:
-          args.append(s[start:ix].strip())
-          start = ix + 1
-      assert depth == 0, s
-      args.append(s[start:].strip())
-      return args
-
-    def parse(s, names):
-      """Parse (recursive) 'Foo(arg, kw=arg)' for Foo in the names dict."""
-      s = s.strip()
-      if s in names:
-        return names[s]
-      elif s[0] == '[':
-        return parse_int_list(s)
-      elif '(' in s:
-        assert s[-1] == ')', s
-        callee = parse(s[:s.index('(')], names)
-        posargs = []
-        kwargs = {}
-        for arg in split_args(s[s.index('(') + 1:-1]):
-          if '=' in arg:
-            kw, value = arg.split('=', 1)
-            kwargs[kw] = parse(value, names)
-          else:
-            posargs.append(parse(arg, names))
-        return callee(*posargs, **kwargs)
-      else:
-        try:
-          return int(s)
-        except ValueError:
-          raise ValueError('Unknown function: %s' % s)
-
-    def parse_fn(s, names):
-      """Like parse(), but implicitly calls no-arg constructors."""
-      fn = parse(s, names)
-      if isinstance(fn, type):
-        return fn()
-      else:
-        return fn
-
-    # pylint: disable=g-import-not-at-top
-    from google.cloud.dataflow.transforms import window as window_module
-    from google.cloud.dataflow.transforms import trigger as trigger_module
-    # pylint: enable=g-import-not-at-top
-    window_fn_names = dict(window_module.__dict__)
-    window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn':
-                            CustomTimestampingFixedWindowsWindowFn})
-    trigger_names = {'Default': DefaultTrigger}
-    trigger_names.update(trigger_module.__dict__)
-
-    window_fn = parse_fn(spec.get('window_fn', 'GlobalWindows'),
-                         window_fn_names)
-    trigger_fn = parse_fn(spec.get('trigger_fn', 'Default'), trigger_names)
-    accumulation_mode = getattr(
-        AccumulationMode, spec.get('accumulation_mode', 'ACCUMULATING').upper())
-    output_time_fn = getattr(
-        OutputTimeFn, spec.get('output_time_fn', 'OUTPUT_AT_EOW').upper())
-    allowed_lateness = float(spec.get('allowed_lateness', '-inf'))
-
-    driver = GeneralTriggerDriver(
-        Windowing(window_fn, trigger_fn, accumulation_mode, output_time_fn))
-    state = InMemoryUnmergedState()
-    output = []
-    watermark = MIN_TIMESTAMP
-
-    def fire_timers():
-      to_fire = state.get_and_clear_timers(watermark)
-      while to_fire:
-        for timer_window, (name, time_domain, t_timestamp) in to_fire:
-          for wvalue in driver.process_timer(
-              timer_window, name, time_domain, t_timestamp, state):
-            window, = wvalue.windows
-            output.append({'window': [window.start, window.end - 1],
-                           'values': sorted(wvalue.value),
-                           'timestamp': wvalue.timestamp})
-        to_fire = state.get_and_clear_timers(watermark)
-
-    for line in spec['transcript']:
-
-      action, params = line.items()[0]
-
-      if action != 'expect':
-        # Fail if we have output that was not expected in the transcript.
-        self.assertEquals(
-            [], output, msg='Unexpected output: %s before %s' % (output, line))
-
-      if action == 'input':
-        bundle = [
-            WindowedValue(t, t, window_fn.assign(WindowFn.AssignContext(t, t)))
-            for t in params]
-        output = [{'window': [wvalue.windows[0].start,
-                              wvalue.windows[0].end - 1],
-                   'values': sorted(wvalue.value),
-                   'timestamp': wvalue.timestamp}
-                  for wvalue
-                  in driver.process_elements(state, bundle, watermark)]
-        fire_timers()
-
-      elif action == 'watermark':
-        watermark = params
-        fire_timers()
-
-      elif action == 'expect':
-        for expected_output in params:
-          for candidate in output:
-            if all(candidate[k] == expected_output[k]
-                   for k in candidate if k in expected_output):
-              output.remove(candidate)
-              break
-          else:
-            self.fail('Unmatched output %s in %s' % (expected_output, output))
-
-      elif action == 'state':
-        # TODO(robertwb): Implement once we support allowed lateness.
-        pass
-
-      else:
-        self.fail('Unknown action: ' + action)
-
-    # Fail if we have output that was not expected in the transcript.
-    self.assertEquals([], output, msg='Unexpected output: %s' % output)
-
-
-TRANSCRIPT_TEST_FILE = os.path.join(os.path.dirname(__file__),
-                                    'trigger_transcripts.yaml')
-if os.path.exists(TRANSCRIPT_TEST_FILE):
-  TranscriptTest._create_tests(TRANSCRIPT_TEST_FILE)
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml b/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml
deleted file mode 100644
index 91d88bf..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/trigger_transcripts.yaml
+++ /dev/null
@@ -1,207 +0,0 @@
-name: fixed_default
-window_fn: FixedWindows(10)       # Python names/syntax, unless otherwise noted.
-trigger_fn: Default               # Same. Empty () may be omitted.
-transcript:                       # Ordered list of events.
-  - input: [1, 2, 3, 10, 11]      # The elements are the timestamps.
-  - watermark: 25
-  - expect:                       # Every expected output from the last action.
-      - {window: [0, 9], values: [1, 2, 3]}
-      - {window: [10, 19], values: [10, 11]}   # Partial match on attributes OK.
-
----
-name: fixed_default_late_data
-window_fn: FixedWindows(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EOW
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3], timestamp: 10, final: false}
-      - {window: [10, 19], values: [10, 11], timestamp: 20}
-      - {window: [20, 29], values: [25], timestamp: 30, late: false}
-  - input: [7]
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3, 7], timestamp: 10, late: true}
-
----
-name: output_time_fn_earliest
-window_fn: FixedWindows(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3], timestamp: 1, final: false}
-      - {window: [10, 19], values: [10, 11], timestamp: 10}
-      - {window: [20, 29], values: [25], timestamp: 25, late: false}
-
----
-name: output_time_fn_latest
-window_fn: FixedWindows(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_LATEST
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3], timestamp: 3, final: false}
-      - {window: [10, 19], values: [10, 11], timestamp: 11}
-      - {window: [20, 29], values: [25], timestamp: 25, late: false}
-
----
-# Test that custom timestamping is not invoked.
-name: output_time_fn_custom_timestamping_eow
-window_fn: CustomTimestampingFixedWindowsWindowFn(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EOW
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3], timestamp: 10, final: false}
-      - {window: [10, 19], values: [10, 11], timestamp: 20}
-      - {window: [20, 29], values: [25], timestamp: 30, late: false}
-
----
-# Test that custom timestamping is not invoked.
-name: output_time_fn_custom_timestamping_earliest
-window_fn: CustomTimestampingFixedWindowsWindowFn(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3], timestamp: 1, final: false}
-      - {window: [10, 19], values: [10, 11], timestamp: 10}
-      - {window: [20, 29], values: [25], timestamp: 25, late: false}
-
----
-# Test that custom timestamping is in fact invoked.
-name: output_time_fn_custom_timestamping_earliest
-window_fn: CustomTimestampingFixedWindowsWindowFn(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST_TRANSFORMED
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3], timestamp: 101, final: false}
-      - {window: [10, 19], values: [10, 11], timestamp: 110}
-      - {window: [20, 29], values: [25], timestamp: 125, late: false}
-
----
-name: early_late_sessions
-window_fn: Sessions(10)
-trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3))
-output_time_fn: OUTPUT_AT_EOW
-transcript:
-    - input: [1, 2, 3]
-    - expect:
-        - {window: [1, 12], values: [1, 2, 3], timestamp: 13, early: true}
-    - input: [4]    # no output
-    - input: [5]
-    - expect:
-        - {window: [1, 14], values: [1, 2, 3, 4, 5], timestamp: 14, early: true}
-    - input: [6]
-    - watermark: 100
-    - expect:
-        - {window: [1, 15], values:[1, 2, 3, 4, 5, 6], timestamp: 16,
-           final: true}
-    - input: [1]
-    - input: [3, 4]
-    - expect:
-        - {window: [1, 15], values: [1, 1, 2, 3, 3, 4, 4, 5, 6], timestamp: 16}
-
----
-name: garbage_collection
-window_fn: FixedWindows(10)
-trigger_fn: AfterCount(2)
-output_time_fn: OUTPUT_AT_EOW
-allowed_lateness: 10
-accumulation_mode: discarding
-transcript:
-  - input: [1, 2, 3, 10, 11, 25]
-  - expect:
-      - {window: [0, 9], timestamp: 10}
-      - {window: [10, 19], timestamp: 20}
-  - state:
-      present: [[20, 29]]
-      absent: [[0, 9]]
-      tombstone: [[10, 19]]
-
----
-name: known_late_data_watermark
-window_fn: FixedWindows(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
-transcript:
-  - watermark: 5
-  - input: [2, 3, 7, 8]
-  - watermark: 11
-  - expect:
-      - {window: [0, 9], values: [2, 3, 7, 8], timestamp: 7}
-
----
-name: known_late_data_no_watermark_hold_possible
-window_fn: FixedWindows(10)
-trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
-transcript:
-  - watermark: 8
-  - input: [2, 3, 7]
-  - watermark: 11
-  - expect:
-      - {window: [0, 9], values: [2, 3, 7], timestamp: 10}
-
-# These next examples test that bad/incomplete transcripts are rejected.
----
-name: bad_output
-error: Unmatched output
-windowfn: FixedWindows(10)
-transcript:
-  - input: [1, 2, 3]
-  - expect:
-      - {window: [0, 9], values: [1, 2, 3]}  # bad
-  - watermark: 100
-
----
-name: bad_expected_values
-error: Unmatched output
-window_fn: FixedWindows(10)
-transcript:
-  - input: [1, 2, 3]
-  - watermark: 100
-  - expect:
-      - {window: [0, 9], values: [1, 2]}  # bad values
-
----
-name: bad_expected_window
-error: Unmatched output
-window_fn: FixedWindows(10)
-transcript:
-  - input: [1, 2, 3]
-  - watermark: 100
-  - expect:
-      - {window: [0, 19], values: [1, 2, 3]}  # bad window
-
----
-name: missing_output
-error: Unexpected output
-window_fn: FixedWindows(10)
-transcript:
-   - input: [1, 2, 3]
-   - watermark: 100
-   # missing output
-   - watermark: 200
-
----
-name: missing_output_at_end
-error: Unexpected output
-window_fn: FixedWindows(10)
-transcript:
-   - input: [1, 2, 3]
-   - watermark: 100
-   # missing output

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/util.py b/sdks/python/google/cloud/dataflow/transforms/util.py
deleted file mode 100644
index 2c41dc3..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/util.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Simple utility PTransforms.
-"""
-
-from __future__ import absolute_import
-
-import collections
-import operator
-
-from google.cloud.dataflow.pvalue import AsIter as AllOf
-from google.cloud.dataflow.transforms.core import CombinePerKey, Create, Flatten, GroupByKey, Map
-from google.cloud.dataflow.transforms.ptransform import PTransform
-from google.cloud.dataflow.transforms.ptransform import ptransform_fn
-
-
-__all__ = [
-    'CoGroupByKey',
-    'Keys',
-    'KvSwap',
-    'RemoveDuplicates',
-    'Values',
-    'assert_that',
-    'equal_to',
-    'is_empty',
-    ]
-
-
-class CoGroupByKey(PTransform):
-  """Groups results across several PCollections by key.
-
-  Given an input dict mapping serializable keys (called "tags") to 0 or more
-  PCollections of (key, value) tuples, e.g.::
-
-     {'pc1': pcoll1, 'pc2': pcoll2, 33333: pcoll3}
-
-  creates a single output PCollection of (key, value) tuples whose keys are the
-  unique input keys from all inputs, and whose values are dicts mapping each
-  tag to an iterable of whatever values were under the key in the corresponding
-  PCollection::
-
-    ('some key', {'pc1': ['value 1 under "some key" in pcoll1',
-                          'value 2 under "some key" in pcoll1'],
-                  'pc2': [],
-                  33333: ['only value under "some key" in pcoll3']})
-
-  Note that pcoll2 had no values associated with "some key".
-
-  CoGroupByKey also works for tuples, lists, or other flat iterables of
-  PCollections, in which case the values of the resulting PCollections
-  will be tuples whose nth value is the list of values from the nth
-  PCollection---conceptually, the "tags" are the indices into the input.
-  Thus, for this input::
-
-     (pcoll1, pcoll2, pcoll3)
-
-  the output PCollection's value for "some key" is::
-
-    ('some key', (['value 1 under "some key" in pcoll1',
-                   'value 2 under "some key" in pcoll1'],
-                  [],
-                  ['only value under "some key" in pcoll3']))
-
-  Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    **kwargs: Accepts a single named argument "pipeline", which specifies the
-      pipeline that "owns" this PTransform. Ordinarily CoGroupByKey can obtain
-      this information from one of the input PCollections, but if there are none
-      (or if there's a chance there may be none), this argument is the only way
-      to provide pipeline information, and should be considered mandatory.
-  """
-
-  def __init__(self, label=None, **kwargs):
-    super(CoGroupByKey, self).__init__(label)
-    self.pipeline = kwargs.pop('pipeline', None)
-    if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
-
-  def _extract_input_pvalues(self, pvalueish):
-    try:
-      # If this works, it's a dict.
-      return pvalueish, tuple(pvalueish.viewvalues())
-    except AttributeError:
-      pcolls = tuple(pvalueish)
-      return pcolls, pcolls
-
-  def apply(self, pcolls):
-    """Performs CoGroupByKey on argument pcolls; see class docstring."""
-    # For associating values in K-V pairs with the PCollections they came from.
-    def _pair_tag_with_value((key, value), tag):
-      return (key, (tag, value))
-
-    # Creates the key, value pairs for the output PCollection. Values are either
-    # lists or dicts (per the class docstring), initialized by the result of
-    # result_ctor(result_ctor_arg).
-    def _merge_tagged_vals_under_key((key, grouped), result_ctor,
-                                     result_ctor_arg):
-      result_value = result_ctor(result_ctor_arg)
-      for tag, value in grouped:
-        result_value[tag].append(value)
-      return (key, result_value)
-
-    try:
-      # If pcolls is a dict, we turn it into (tag, pcoll) pairs for use in the
-      # general-purpose code below. The result value constructor creates dicts
-      # whose keys are the tags.
-      result_ctor_arg = pcolls.keys()
-      result_ctor = lambda tags: dict((tag, []) for tag in tags)
-      pcolls = pcolls.items()
-    except AttributeError:
-      # Otherwise, pcolls is a list/tuple, so we turn it into (index, pcoll)
-      # pairs. The result value constructor makes tuples with len(pcolls) slots.
-      pcolls = list(enumerate(pcolls))
-      result_ctor_arg = len(pcolls)
-      result_ctor = lambda size: tuple([] for _ in xrange(size))
-
-    # Check input PCollections for PCollection-ness, and that they all belong
-    # to the same pipeline.
-    for _, pcoll in pcolls:
-      self._check_pcollection(pcoll)
-      if self.pipeline:
-        assert pcoll.pipeline == self.pipeline
-
-    return ([pcoll | Map('pair_with_%s' % tag, _pair_tag_with_value, tag)
-             for tag, pcoll in pcolls]
-            | Flatten(pipeline=self.pipeline)
-            | GroupByKey()
-            | Map(_merge_tagged_vals_under_key, result_ctor, result_ctor_arg))
-
-
-def Keys(label='Keys'):  # pylint: disable=invalid-name
-  """Produces a PCollection of first elements of 2-tuples in a PCollection."""
-  return Map(label, lambda (k, v): k)
-
-
-def Values(label='Values'):  # pylint: disable=invalid-name
-  """Produces a PCollection of second elements of 2-tuples in a PCollection."""
-  return Map(label, lambda (k, v): v)
-
-
-def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
-  """Produces a PCollection reversing 2-tuples in a PCollection."""
-  return Map(label, lambda (k, v): (v, k))
-
-
-@ptransform_fn
-def RemoveDuplicates(label, pcoll):  # pylint: disable=invalid-name
-  """Produces a PCollection containing the unique elements of a PCollection."""
-  return (pcoll
-          | Map('%s:ToPairs' % label, lambda v: (v, None))
-          | CombinePerKey('%s:Group' % label, lambda vs: None)
-          | Keys('%s:RemoveDuplicates' % label))
-
-
-class DataflowAssertException(Exception):
-  """Exception raised by matcher classes used by assert_that transform."""
-
-  pass
-
-
-# Note that equal_to always sorts the expected and actual since what we
-# compare are PCollections for which there is no guaranteed order.
-# However the sorting does not go beyond top level therefore [1,2] and [2,1]
-# are considered equal and [[1,2]] and [[2,1]] are not.
-# TODO(silviuc): Add contains_in_any_order-style matchers.
-def equal_to(expected):
-  expected = list(expected)
-  def _equal(actual):
-    sorted_expected = sorted(expected)
-    sorted_actual = sorted(actual)
-    if sorted_expected != sorted_actual:
-      raise DataflowAssertException(
-          'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
-  return _equal
-
-
-def is_empty():
-  def _empty(actual):
-    if actual:
-      raise DataflowAssertException(
-          'Failed assert: [] == %r' % actual)
-  return _empty
-
-
-def assert_that(actual, matcher, label='assert_that'):
-  """A PTransform that checks a PCollection has an expected value.
-
-  Note that assert_that should be used only for testing pipelines since the
-  check relies on materializing the entire PCollection being checked.
-
-  Args:
-    actual: A PCollection.
-    matcher: A matcher function taking as argument the actual value of a
-      materialized PCollection. The matcher validates this actual value against
-      expectations and raises DataflowAssertException if they are not met.
-    label: Optional string label. This is needed in case several assert_that
-      transforms are introduced in the same pipeline.
-
-  Returns:
-    Ignored.
-  """
-
-  def match(_, actual):
-    matcher(actual)
-
-  class AssertThat(PTransform):
-
-    def apply(self, pipeline):
-      return pipeline | Create('singleton', [None]) | Map(match, AllOf(actual))
-
-    def default_label(self):
-      return label
-
-  actual.pipeline | AssertThat()


Mime
View raw message