beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Implement streaming GroupByKey in Python DirectRunner
Date Tue, 27 Jun 2017 05:49:33 GMT
Repository: beam
Updated Branches:
  refs/heads/master 95e6bbe50 -> 8036001da


Implement streaming GroupByKey in Python DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb379e76
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb379e76
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb379e76

Branch: refs/heads/master
Commit: eb379e76adaa9c4b4e24a4b3c5757be8523d95c4
Parents: 95e6bbe
Author: Charles Chen <ccy@google.com>
Authored: Mon Jun 26 16:54:00 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Mon Jun 26 22:49:06 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py |  29 +++-
 .../runners/direct/evaluation_context.py        |   2 +-
 .../runners/direct/transform_evaluator.py       | 138 ++++++++++++++++++-
 sdks/python/apache_beam/runners/direct/util.py  |  25 ++--
 .../runners/direct/watermark_manager.py         |  26 ++--
 .../apache_beam/testing/test_stream_test.py     |  37 ++++-
 sdks/python/apache_beam/transforms/trigger.py   |  16 +++
 7 files changed, 239 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index d80ef10..2a75977 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -34,6 +34,7 @@ from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
 from apache_beam.transforms.core import _GroupAlsoByWindow
+from apache_beam.transforms.core import _GroupByKeyOnly
 from apache_beam.options.pipeline_options import DirectOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.value_provider import RuntimeValueProvider
@@ -47,6 +48,13 @@ K = typehints.TypeVariable('K')
 V = typehints.TypeVariable('V')
 
 
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
+  """Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
+  pass
+
+
 @typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
 class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
@@ -79,17 +87,24 @@ class DirectRunner(PipelineRunner):
     except NotImplementedError:
       return transform.expand(pcoll)
 
+  def apply__GroupByKeyOnly(self, transform, pcoll):
+    if (transform.__class__ == _GroupByKeyOnly and
+        pcoll.pipeline._options.view_as(StandardOptions).streaming):
+      # Use specialized streaming implementation, if requested.
+      type_hints = transform.get_type_hints()
+      return pcoll | (_StreamingGroupByKeyOnly()
+                      .with_input_types(*type_hints.input_types[0])
+                      .with_output_types(*type_hints.output_types[0]))
+    return transform.expand(pcoll)
+
   def apply__GroupAlsoByWindow(self, transform, pcoll):
     if (transform.__class__ == _GroupAlsoByWindow and
         pcoll.pipeline._options.view_as(StandardOptions).streaming):
       # Use specialized streaming implementation, if requested.
-      raise NotImplementedError(
-          'Streaming support is not yet available on the DirectRunner.')
-      # TODO(ccy): enable when streaming implementation is plumbed through.
-      # type_hints = transform.get_type_hints()
-      # return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
-      #                 .with_input_types(*type_hints.input_types[0])
-      #                 .with_output_types(*type_hints.output_types[0]))
+      type_hints = transform.get_type_hints()
+      return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
+                      .with_input_types(*type_hints.input_types[0])
+                      .with_output_types(*type_hints.output_types[0]))
     return transform.expand(pcoll)
 
   def run(self, pipeline):

http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 669a68a..54c407c 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -213,7 +213,7 @@ class EvaluationContext(object):
           result.unprocessed_bundles)
       self._watermark_manager.update_watermarks(
           completed_bundle, result.transform, completed_timers,
-          committed_bundles, unprocessed_bundles, result.watermark_hold)
+          committed_bundles, unprocessed_bundles, result.keyed_watermark_holds)
 
       self._metrics.commit_logical(completed_bundle,
                                    result.logical_metric_updates)

http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 3aefbb8..67b2492 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -27,6 +27,8 @@ from apache_beam.internal import pickler
 import apache_beam.io as io
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
+from apache_beam.runners.direct.direct_runner import _StreamingGroupByKeyOnly
+from apache_beam.runners.direct.direct_runner import _StreamingGroupAlsoByWindow
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
 from apache_beam.runners.direct.util import KeyedWorkItem
 from apache_beam.runners.direct.util import TransformResult
@@ -38,6 +40,7 @@ from apache_beam.testing.test_stream import ProcessingTimeEvent
 from apache_beam.transforms import core
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
+from apache_beam.transforms.trigger import create_trigger_driver
 from apache_beam.transforms.trigger import _CombiningValueStateTag
 from apache_beam.transforms.trigger import _ListStateTag
 from apache_beam.transforms.trigger import TimeDomain
@@ -63,6 +66,8 @@ class TransformEvaluatorRegistry(object):
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+        _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator,
+        _StreamingGroupAlsoByWindow: _StreamingGroupAlsoByWindowEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
         TestStream: _TestStreamEvaluator,
     }
@@ -125,7 +130,10 @@ class TransformEvaluatorRegistry(object):
       True if executor should execute applied_ptransform serially.
     """
     return isinstance(applied_ptransform.transform,
-                      (core._GroupByKeyOnly, _NativeWrite))
+                      (core._GroupByKeyOnly,
+                       _StreamingGroupByKeyOnly,
+                       _StreamingGroupAlsoByWindow,
+                       _NativeWrite,))
 
 
 class RootBundleProvider(object):
@@ -234,7 +242,7 @@ class _TransformEvaluator(object):
     timer and passes it to process_element().  Evaluator subclasses which
     desire different timer delivery semantics can override process_timer().
     """
-    state = self.step_context.get_keyed_state(timer_firing.key)
+    state = self.step_context.get_keyed_state(timer_firing.encoded_key)
     state.clear_timer(
         timer_firing.window, timer_firing.name, timer_firing.time_domain)
     self.process_timer(timer_firing)
@@ -242,7 +250,9 @@ class _TransformEvaluator(object):
   def process_timer(self, timer_firing):
     """Default process_timer() impl. generating KeyedWorkItem element."""
     self.process_element(
-        KeyedWorkItem(timer_firing.key, timer_firing=timer_firing))
+        GlobalWindows.windowed_value(
+            KeyedWorkItem(timer_firing.encoded_key,
+                          timer_firings=[timer_firing])))
 
   def process_element(self, element):
     """Processes a new element as part of the current bundle."""
@@ -343,7 +353,8 @@ class _TestStreamEvaluator(_TransformEvaluator):
       unprocessed_bundles.append(unprocessed_bundle)
       hold = self.watermark
     return TransformResult(
-        self._applied_ptransform, self.bundles, unprocessed_bundles, None, hold)
+        self._applied_ptransform, self.bundles, unprocessed_bundles, None,
+        {None: hold})
 
 
 class _FlattenEvaluator(_TransformEvaluator):
@@ -547,7 +558,122 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
           None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, bundles, [], None, hold)
+        self._applied_ptransform, bundles, [], None, {None: hold})
+
+
+class _StreamingGroupByKeyOnlyEvaluator(_TransformEvaluator):
+  """TransformEvaluator for _StreamingGroupByKeyOnly transform.
+
+  The _GroupByKeyOnlyEvaluator buffers elements until its input watermark goes
+  to infinity, which is suitable for batch mode execution. During streaming
+  mode execution, we emit each bundle as it comes to the next transform.
+  """
+
+  MAX_ELEMENT_PER_BUNDLE = None
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs, scoped_metrics_container):
+    assert not side_inputs
+    super(_StreamingGroupByKeyOnlyEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs, scoped_metrics_container)
+
+  def start_bundle(self):
+    self.gbk_items = collections.defaultdict(list)
+
+    assert len(self._outputs) == 1
+    self.output_pcollection = list(self._outputs)[0]
+
+    # The input type of a GroupByKey will be KV[Any, Any] or more specific.
+    kv_type_hint = (
+        self._applied_ptransform.transform.get_type_hints().input_types[0])
+    self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+
+  def process_element(self, element):
+    if (isinstance(element, WindowedValue)
+        and isinstance(element.value, collections.Iterable)
+        and len(element.value) == 2):
+      k, v = element.value
+      self.gbk_items[self.key_coder.encode(k)].append(v)
+    else:
+      raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
+                           'windowed key-value pairs. Instead received: %r.'
+                           % element)
+
+  def finish_bundle(self):
+    bundles = []
+    bundle = None
+    for encoded_k, vs in self.gbk_items.iteritems():
+      if not bundle:
+        bundle = self._evaluation_context.create_bundle(
+            self.output_pcollection)
+        bundles.append(bundle)
+      kwi = KeyedWorkItem(encoded_k, elements=vs)
+      bundle.add(GlobalWindows.windowed_value(kwi))
+
+    return TransformResult(
+        self._applied_ptransform, bundles, [], None, None)
+
+
+class _StreamingGroupAlsoByWindowEvaluator(_TransformEvaluator):
+  """TransformEvaluator for the _StreamingGroupAlsoByWindow transform.
+
+  This evaluator is only used in streaming mode.  In batch mode, the
+  GroupAlsoByWindow operation is evaluated as a normal DoFn, as defined
+  in transforms/core.py.
+  """
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs, scoped_metrics_container):
+    assert not side_inputs
+    super(_StreamingGroupAlsoByWindowEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs, scoped_metrics_container)
+
+  def start_bundle(self):
+    assert len(self._outputs) == 1
+    self.output_pcollection = list(self._outputs)[0]
+    self.step_context = self._execution_context.get_step_context()
+    self.driver = create_trigger_driver(
+        self._applied_ptransform.transform.windowing)
+    self.gabw_items = []
+    self.keyed_holds = {}
+
+    # The input type of a GroupAlsoByWindow will be KV[Any, Iter[Any]] or more
+    # specific.
+    kv_type_hint = (
+        self._applied_ptransform.transform.get_type_hints().input_types[0])
+    self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+
+  def process_element(self, element):
+    kwi = element.value
+    assert isinstance(kwi, KeyedWorkItem), kwi
+    encoded_k, timer_firings, vs = (
+        kwi.encoded_key, kwi.timer_firings, kwi.elements)
+    k = self.key_coder.decode(encoded_k)
+    state = self.step_context.get_keyed_state(encoded_k)
+
+    for timer_firing in timer_firings:
+      for wvalue in self.driver.process_timer(
+          timer_firing.window, timer_firing.name, timer_firing.time_domain,
+          timer_firing.timestamp, state):
+        self.gabw_items.append(wvalue.with_value((k, wvalue.value)))
+    if vs:
+      for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
+        self.gabw_items.append(wvalue.with_value((k, wvalue.value)))
+
+    self.keyed_holds[encoded_k] = state.get_earliest_hold()
+
+  def finish_bundle(self):
+    bundles = []
+    if self.gabw_items:
+      bundle = self._evaluation_context.create_bundle(self.output_pcollection)
+      for item in self.gabw_items:
+        bundle.add(item)
+      bundles.append(bundle)
+
+    return TransformResult(
+        self._applied_ptransform, bundles, [], None, self.keyed_holds)
 
 
 class _NativeWriteEvaluator(_TransformEvaluator):
@@ -612,4 +738,4 @@ class _NativeWriteEvaluator(_TransformEvaluator):
           None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, [], [], None, hold)
+        self._applied_ptransform, [], [], None, {None: hold})

http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
index 8c846fc..10f7b29 100644
--- a/sdks/python/apache_beam/runners/direct/util.py
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -27,13 +27,21 @@ class TransformResult(object):
   """Result of evaluating an AppliedPTransform with a TransformEvaluator."""
 
   def __init__(self, applied_ptransform, uncommitted_output_bundles,
-               unprocessed_bundles, counters, watermark_hold,
+               unprocessed_bundles, counters, keyed_watermark_holds,
                undeclared_tag_values=None):
     self.transform = applied_ptransform
     self.uncommitted_output_bundles = uncommitted_output_bundles
     self.unprocessed_bundles = unprocessed_bundles
     self.counters = counters
-    self.watermark_hold = watermark_hold
+    # Mapping of key -> earliest hold timestamp or None.  Keys should be
+    # strings or None.
+    #
+    # For each key, we receive as its corresponding value the earliest
+    # watermark hold for that key (the key can be None for global state), past
+    # which the output watermark for the currently-executing step will not
+    # advance.  If the value is None or utils.timestamp.MAX_TIMESTAMP, the
+    # watermark hold will be removed.
+    self.keyed_watermark_holds = keyed_watermark_holds or {}
     # Only used when caching (materializing) all values is requested.
     self.undeclared_tag_values = undeclared_tag_values
     # Populated by the TransformExecutor.
@@ -43,8 +51,8 @@ class TransformResult(object):
 class TimerFiring(object):
   """A single instance of a fired timer."""
 
-  def __init__(self, key, window, name, time_domain, timestamp):
-    self.key = key
+  def __init__(self, encoded_key, window, name, time_domain, timestamp):
+    self.encoded_key = encoded_key
     self.window = window
     self.name = name
     self.time_domain = time_domain
@@ -53,8 +61,7 @@ class TimerFiring(object):
 
 class KeyedWorkItem(object):
   """A keyed item that can either be a timer firing or a list of elements."""
-  def __init__(self, key, timer_firing=None, elements=None):
-    self.key = key
-    assert not timer_firing and elements
-    self.timer_firing = timer_firing
-    self.elements = elements
+  def __init__(self, encoded_key, timer_firings=None, elements=None):
+    self.encoded_key = encoded_key
+    self.timer_firings = timer_firings or []
+    self.elements = elements or []

http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 4aa2bb4..935998d 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -94,13 +94,13 @@ class WatermarkManager(object):
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
                         completed_timers, outputs, unprocessed_bundles,
-                        earliest_hold):
+                        keyed_earliest_holds):
     assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
     self._update_pending(
         completed_committed_bundle, applied_ptransform, completed_timers,
         outputs, unprocessed_bundles)
     tw = self.get_watermarks(applied_ptransform)
-    tw.hold(earliest_hold)
+    tw.hold(keyed_earliest_holds)
     self._refresh_watermarks(applied_ptransform)
 
   def _update_pending(self, input_committed_bundle, applied_ptransform,
@@ -161,7 +161,7 @@ class _TransformWatermarks(object):
     self._input_transform_watermarks = []
     self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
     self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
-    self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
+    self._keyed_earliest_holds = {}
     self._pending = set()  # Scheduled bundles targeted for this transform.
     self._fired_timers = set()
     self._lock = threading.Lock()
@@ -187,11 +187,13 @@ class _TransformWatermarks(object):
     with self._lock:
       return self._output_watermark
 
-  def hold(self, value):
+  def hold(self, keyed_earliest_holds):
     with self._lock:
-      if value is None:
-        value = WatermarkManager.WATERMARK_POS_INF
-      self._earliest_hold = value
+      for key, hold_value in keyed_earliest_holds.iteritems():
+        self._keyed_earliest_holds[key] = hold_value
+        if (hold_value is None or
+            hold_value == WatermarkManager.WATERMARK_POS_INF):
+          del self._keyed_earliest_holds[key]
 
   def add_pending(self, pending):
     with self._lock:
@@ -230,7 +232,11 @@ class _TransformWatermarks(object):
 
       self._input_watermark = max(self._input_watermark,
                                   min(pending_holder, producer_watermark))
-      new_output_watermark = min(self._input_watermark, self._earliest_hold)
+      earliest_hold = WatermarkManager.WATERMARK_POS_INF
+      for hold in self._keyed_earliest_holds.values():
+        if hold < earliest_hold:
+          earliest_hold = hold
+      new_output_watermark = min(self._input_watermark, earliest_hold)
 
       advanced = new_output_watermark > self._output_watermark
       self._output_watermark = new_output_watermark
@@ -246,11 +252,11 @@ class _TransformWatermarks(object):
         return False
 
       fired_timers = []
-      for key, state in self._keyed_states.iteritems():
+      for encoded_key, state in self._keyed_states.iteritems():
         timers = state.get_timers(watermark=self._input_watermark)
         for expired in timers:
           window, (name, time_domain, timestamp) = expired
           fired_timers.append(
-              TimerFiring(key, window, name, time_domain, timestamp))
+              TimerFiring(encoded_key, window, name, time_domain, timestamp))
       self._fired_timers.update(fired_timers)
       return fired_timers

http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/testing/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index 071c7cd..b7ca141 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -20,12 +20,15 @@
 import unittest
 
 import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_stream import ElementEvent
 from apache_beam.testing.test_stream import ProcessingTimeEvent
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.test_stream import WatermarkEvent
 from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils import timestamp
 from apache_beam.utils.windowed_value import WindowedValue
@@ -98,7 +101,9 @@ class TestStreamTest(unittest.TestCase):
                   timestamp=beam.DoFn.TimestampParam):
         yield (element, timestamp)
 
-    p = TestPipeline()
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     my_record_fn = RecordFn()
     records = p | test_stream | beam.ParDo(my_record_fn)
     assert_that(records, equal_to([
@@ -111,6 +116,36 @@ class TestStreamTest(unittest.TestCase):
         ('last', timestamp.Timestamp(310)),]))
     p.run()
 
+  def test_gbk_execution(self):
+    test_stream = (TestStream()
+                   .advance_watermark_to(10)
+                   .add_elements(['a', 'b', 'c'])
+                   .advance_watermark_to(20)
+                   .add_elements(['d'])
+                   .add_elements(['e'])
+                   .advance_processing_time(10)
+                   .advance_watermark_to(300)
+                   .add_elements([TimestampedValue('late', 12)])
+                   .add_elements([TimestampedValue('last', 310)]))
+
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
+    records = (p
+               | test_stream
+               | beam.WindowInto(FixedWindows(15))
+               | beam.Map(lambda x: ('k', x))
+               | beam.GroupByKey())
+    # TODO(BEAM-2519): timestamp assignment for elements from a GBK should
+    # respect the TimestampCombiner.  The test below should also verify the
+    # timestamps of the outputted elements once this is implemented.
+    assert_that(records, equal_to([
+        ('k', ['a', 'b', 'c']),
+        ('k', ['d', 'e']),
+        ('k', ['late']),
+        ('k', ['last'])]))
+    p.run()
+
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/eb379e76/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 7ff44fa..f77fa1a 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -36,6 +36,7 @@ from apache_beam.transforms.window import WindowFn
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import TIME_GRANULARITY
 
 # AfterCount is experimental. No backwards compatibility guarantees.
 
@@ -1066,6 +1067,8 @@ class InMemoryUnmergedState(UnmergedState):
 
   def clear_timer(self, window, name, time_domain):
     self.timers[window].pop((name, time_domain), None)
+    if not self.timers[window]:
+      del self.timers[window]
 
   def get_window(self, window_id):
     return window_id
@@ -1117,6 +1120,19 @@ class InMemoryUnmergedState(UnmergedState):
   def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
     return self.get_timers(clear=True, watermark=watermark)
 
+  def get_earliest_hold(self):
+    earliest_hold = MAX_TIMESTAMP
+    for unused_window, tagged_states in self.state.iteritems():
+      # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is
+      # named "watermark".  This is currently only true because the only place
+      # watermark holds are set is in the GeneralTriggerDriver, where we use
+      # this name.  We should fix this by allowing enumeration of the tag types
+      # used in adding state.
+      if 'watermark' in tagged_states and tagged_states['watermark']:
+        hold = min(tagged_states['watermark']) - TIME_GRANULARITY
+        earliest_hold = min(earliest_hold, hold)
+    return earliest_hold
+
   def __repr__(self):
     state_str = '\n'.join('%s: %s' % (key, dict(state))
                           for key, state in self.state.items())


Mime
View raw message