beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [11/50] beam git commit: Allow production of unprocessed bundles, introduce TestStream evaluator in DirectRunner
Date Fri, 23 Jun 2017 03:04:37 GMT
Allow production of unprocessed bundles, introduce TestStream evaluator in DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3520f948
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3520f948
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3520f948

Branch: refs/heads/gearpump-runner
Commit: 3520f94882b00aa8db64f6379044689d1b78ac06
Parents: 28c6fd4
Author: Charles Chen <ccy@google.com>
Authored: Tue Jun 20 17:16:20 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Jun 21 13:44:05 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/evaluation_context.py        | 14 ++--
 .../apache_beam/runners/direct/executor.py      | 40 +++++++--
 .../runners/direct/transform_evaluator.py       | 88 ++++++++++++++++++--
 sdks/python/apache_beam/runners/direct/util.py  |  4 +-
 .../runners/direct/watermark_manager.py         | 11 ++-
 sdks/python/apache_beam/testing/test_stream.py  |  5 ++
 .../apache_beam/testing/test_stream_test.py     | 37 ++++++++
 7 files changed, 176 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 976e9e8..669a68a 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -208,11 +208,12 @@ class EvaluationContext(object):
       the committed bundles contained within the handled result.
     """
     with self._lock:
-      committed_bundles = self._commit_bundles(
-          result.uncommitted_output_bundles)
+      committed_bundles, unprocessed_bundles = self._commit_bundles(
+          result.uncommitted_output_bundles,
+          result.unprocessed_bundles)
       self._watermark_manager.update_watermarks(
           completed_bundle, result.transform, completed_timers,
-          committed_bundles, result.watermark_hold)
+          committed_bundles, unprocessed_bundles, result.watermark_hold)
 
       self._metrics.commit_logical(completed_bundle,
                                    result.logical_metric_updates)
@@ -252,14 +253,17 @@ class EvaluationContext(object):
           executor_service.submit(task)
         self._pending_unblocked_tasks = []
 
-  def _commit_bundles(self, uncommitted_bundles):
+  def _commit_bundles(self, uncommitted_bundles, unprocessed_bundles):
     """Commits bundles and returns a immutable set of committed bundles."""
     for in_progress_bundle in uncommitted_bundles:
       producing_applied_ptransform = in_progress_bundle.pcollection.producer
       watermarks = self._watermark_manager.get_watermarks(
           producing_applied_ptransform)
       in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
-    return tuple(uncommitted_bundles)
+
+    for unprocessed_bundle in unprocessed_bundles:
+      unprocessed_bundle.commit(None)
+    return tuple(uncommitted_bundles), tuple(unprocessed_bundles)
 
   def get_execution_context(self, applied_ptransform):
     return _ExecutionContext(

http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index a0a3886..e70e326 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -227,17 +227,25 @@ class _CompletionCallback(object):
     self._all_updates = all_updates
     self._timer_firings = timer_firings or []
 
-  def handle_result(self, input_committed_bundle, transform_result):
+  def handle_result(self, transform_executor, input_committed_bundle,
+                    transform_result):
     output_committed_bundles = self._evaluation_context.handle_result(
         input_committed_bundle, self._timer_firings, transform_result)
     for output_committed_bundle in output_committed_bundles:
       self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate(
-          output_committed_bundle, None))
+          transform_executor,
+          committed_bundle=output_committed_bundle))
+    for unprocessed_bundle in transform_result.unprocessed_bundles:
+      self._all_updates.offer(
+          _ExecutorServiceParallelExecutor._ExecutorUpdate(
+              transform_executor,
+              unprocessed_bundle=unprocessed_bundle))
     return output_committed_bundles
 
-  def handle_exception(self, exception):
+  def handle_exception(self, transform_executor, exception):
     self._all_updates.offer(
-        _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception))
+        _ExecutorServiceParallelExecutor._ExecutorUpdate(
+            transform_executor, exception=exception))
 
 
 class TransformExecutor(_ExecutorService.CallableTask):
@@ -312,10 +320,10 @@ class TransformExecutor(_ExecutorService.CallableTask):
             self._evaluation_context.append_to_cache(
                 self._applied_ptransform, tag, value)
 
-      self._completion_callback.handle_result(self._input_bundle, result)
+      self._completion_callback.handle_result(self, self._input_bundle, result)
       return result
     except Exception as e:  # pylint: disable=broad-except
-      self._completion_callback.handle_exception(e)
+      self._completion_callback.handle_exception(self, e)
     finally:
       self._evaluation_context.metrics().commit_physical(
           self._input_bundle,
@@ -387,6 +395,10 @@ class _ExecutorServiceParallelExecutor(object):
         self.schedule_consumption(applied_ptransform, committed_bundle, [],
                                   self.default_completion_callback)
 
+  def schedule_unprocessed_bundle(self, applied_ptransform,
+                                  unprocessed_bundle):
+    self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle)
+
   def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
                            fired_timers, on_complete):
     """Schedules evaluation of the given bundle with the transform."""
@@ -433,10 +445,16 @@ class _ExecutorServiceParallelExecutor(object):
   class _ExecutorUpdate(object):
     """An internal status update on the state of the executor."""
 
-    def __init__(self, produced_bundle=None, exception=None):
+    def __init__(self, transform_executor, committed_bundle=None,
+                 unprocessed_bundle=None, exception=None):
+      self.transform_executor = transform_executor
       # Exactly one of them should be not-None
-      assert bool(produced_bundle) != bool(exception)
-      self.committed_bundle = produced_bundle
+      assert sum([
+          bool(committed_bundle),
+          bool(unprocessed_bundle),
+          bool(exception)]) == 1
+      self.committed_bundle = committed_bundle
+      self.unprocessed_bundle = unprocessed_bundle
       self.exception = exception
       self.exc_info = sys.exc_info()
       if self.exc_info[1] is not exception:
@@ -471,6 +489,10 @@ class _ExecutorServiceParallelExecutor(object):
         while update:
           if update.committed_bundle:
             self._executor.schedule_consumers(update.committed_bundle)
+          elif update.unprocessed_bundle:
+            self._executor.schedule_unprocessed_bundle(
+                update.transform_executor._applied_ptransform,
+                update.unprocessed_bundle)
           else:
             assert update.exception
             logging.warning('A task failed with exception.\n %s',

http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index e92d799..3aefbb8 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -31,6 +31,10 @@ from apache_beam.runners.direct.watermark_manager import WatermarkManager
 from apache_beam.runners.direct.util import KeyedWorkItem
 from apache_beam.runners.direct.util import TransformResult
 from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # pylint: disable=protected-access
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.test_stream import ElementEvent
+from apache_beam.testing.test_stream import WatermarkEvent
+from apache_beam.testing.test_stream import ProcessingTimeEvent
 from apache_beam.transforms import core
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
@@ -41,6 +45,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
 from apache_beam.utils import counters
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.options.pipeline_options import TypeOptions
 
 
@@ -59,9 +64,11 @@ class TransformEvaluatorRegistry(object):
         core.ParDo: _ParDoEvaluator,
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
+        TestStream: _TestStreamEvaluator,
     }
     self._root_bundle_providers = {
         core.PTransform: DefaultRootBundleProvider,
+        TestStream: _TestStreamRootBundleProvider,
     }
 
   def get_evaluator(
@@ -142,6 +149,23 @@ class DefaultRootBundleProvider(RootBundleProvider):
     return [empty_bundle]
 
 
+class _TestStreamRootBundleProvider(RootBundleProvider):
+  """Provides an initial bundle for the TestStream evaluator."""
+
+  def get_root_bundles(self):
+    test_stream = self._applied_ptransform.transform
+    bundles = []
+    if len(test_stream.events) > 0:
+      bundle = self._evaluation_context.create_bundle(
+          pvalue.PBegin(self._applied_ptransform.transform.pipeline))
+      # Explicitly set timestamp to MIN_TIMESTAMP to ensure that we hold the
+      # watermark.
+      bundle.add(GlobalWindows.windowed_value(0, timestamp=MIN_TIMESTAMP))
+      bundle.commit(None)
+      bundles.append(bundle)
+    return bundles
+
+
 class _TransformEvaluator(object):
   """An evaluator of a specific application of a transform."""
 
@@ -265,7 +289,61 @@ class _BoundedReadEvaluator(_TransformEvaluator):
         bundles = _read_values_to_bundles(reader)
 
     return TransformResult(
-        self._applied_ptransform, bundles, None, None)
+        self._applied_ptransform, bundles, [], None, None)
+
+
+class _TestStreamEvaluator(_TransformEvaluator):
+  """TransformEvaluator for the TestStream transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs, scoped_metrics_container):
+    assert not side_inputs
+    self.test_stream = applied_ptransform.transform
+    super(_TestStreamEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs, scoped_metrics_container)
+
+  def start_bundle(self):
+    self.current_index = -1
+    self.watermark = MIN_TIMESTAMP
+    self.bundles = []
+
+  def process_element(self, element):
+    index = element.value
+    self.watermark = element.timestamp
+    assert isinstance(index, int)
+    assert 0 <= index <= len(self.test_stream.events)
+    self.current_index = index
+    event = self.test_stream.events[self.current_index]
+    if isinstance(event, ElementEvent):
+      assert len(self._outputs) == 1
+      output_pcollection = list(self._outputs)[0]
+      bundle = self._evaluation_context.create_bundle(output_pcollection)
+      for tv in event.timestamped_values:
+        bundle.output(
+            GlobalWindows.windowed_value(tv.value, timestamp=tv.timestamp))
+      self.bundles.append(bundle)
+    elif isinstance(event, WatermarkEvent):
+      assert event.new_watermark >= self.watermark
+      self.watermark = event.new_watermark
+    elif isinstance(event, ProcessingTimeEvent):
+      # TODO(ccy): advance processing time in the context's mock clock.
+      pass
+    else:
+      raise ValueError('Invalid TestStream event: %s.' % event)
+
+  def finish_bundle(self):
+    unprocessed_bundles = []
+    hold = None
+    if self.current_index < len(self.test_stream.events) - 1:
+      unprocessed_bundle = self._evaluation_context.create_bundle(
+          pvalue.PBegin(self._applied_ptransform.transform.pipeline))
+      unprocessed_bundle.add(GlobalWindows.windowed_value(
+          self.current_index + 1, timestamp=self.watermark))
+      unprocessed_bundles.append(unprocessed_bundle)
+      hold = self.watermark
+    return TransformResult(
+        self._applied_ptransform, self.bundles, unprocessed_bundles, None, hold)
 
 
 class _FlattenEvaluator(_TransformEvaluator):
@@ -289,7 +367,7 @@ class _FlattenEvaluator(_TransformEvaluator):
   def finish_bundle(self):
     bundles = [self.bundle]
     return TransformResult(
-        self._applied_ptransform, bundles, None, None)
+        self._applied_ptransform, bundles, [], None, None)
 
 
 class _TaggedReceivers(dict):
@@ -378,7 +456,7 @@ class _ParDoEvaluator(_TransformEvaluator):
     bundles = self._tagged_receivers.values()
     result_counters = self._counter_factory.get_counters()
     return TransformResult(
-        self._applied_ptransform, bundles, result_counters, None,
+        self._applied_ptransform, bundles, [], result_counters, None,
         self._tagged_receivers.undeclared_in_memory_tag_values)
 
 
@@ -469,7 +547,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
           None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, bundles, None, hold)
+        self._applied_ptransform, bundles, [], None, hold)
 
 
 class _NativeWriteEvaluator(_TransformEvaluator):
@@ -534,4 +612,4 @@ class _NativeWriteEvaluator(_TransformEvaluator):
           None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, [], None, hold)
+        self._applied_ptransform, [], [], None, hold)

http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
index daaaceb..8c846fc 100644
--- a/sdks/python/apache_beam/runners/direct/util.py
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -27,9 +27,11 @@ class TransformResult(object):
   """Result of evaluating an AppliedPTransform with a TransformEvaluator."""
 
   def __init__(self, applied_ptransform, uncommitted_output_bundles,
-               counters, watermark_hold, undeclared_tag_values=None):
+               unprocessed_bundles, counters, watermark_hold,
+               undeclared_tag_values=None):
     self.transform = applied_ptransform
     self.uncommitted_output_bundles = uncommitted_output_bundles
+    self.unprocessed_bundles = unprocessed_bundles
     self.counters = counters
     self.watermark_hold = watermark_hold
     # Only used when caching (materializing) all values is requested.

http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 10d25d7..2146bb5 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -93,17 +93,19 @@ class WatermarkManager(object):
     return self._transform_to_watermarks[applied_ptransform]
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
-                        completed_timers, outputs, earliest_hold):
+                        completed_timers, outputs, unprocessed_bundles,
+                        earliest_hold):
     assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
     self._update_pending(
         completed_committed_bundle, applied_ptransform, completed_timers,
-        outputs)
+        outputs, unprocessed_bundles)
     tw = self.get_watermarks(applied_ptransform)
     tw.hold(earliest_hold)
     self._refresh_watermarks(applied_ptransform)
 
   def _update_pending(self, input_committed_bundle, applied_ptransform,
-                      completed_timers, output_committed_bundles):
+                      completed_timers, output_committed_bundles,
+                      unprocessed_bundles):
     """Updated list of pending bundles for the given AppliedPTransform."""
 
     # Update pending elements. Filter out empty bundles. They do not impact
@@ -119,6 +121,9 @@ class WatermarkManager(object):
     completed_tw = self._transform_to_watermarks[applied_ptransform]
     completed_tw.update_timers(completed_timers)
 
+    for unprocessed_bundle in unprocessed_bundles:
+      completed_tw.add_pending(unprocessed_bundle)
+
     assert input_committed_bundle or applied_ptransform in self._root_transforms
     if input_committed_bundle and input_committed_bundle.has_elements():
       completed_tw.remove_pending(input_committed_bundle)

http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/testing/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index a06bcd0..7989fb2 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -24,8 +24,10 @@ from abc import ABCMeta
 from abc import abstractmethod
 
 from apache_beam import coders
+from apache_beam import core
 from apache_beam import pvalue
 from apache_beam.transforms import PTransform
+from apache_beam.transforms import window
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils import timestamp
 from apache_beam.utils.windowed_value import WindowedValue
@@ -99,6 +101,9 @@ class TestStream(PTransform):
     self.current_watermark = timestamp.MIN_TIMESTAMP
     self.events = []
 
+  def get_windowing(self, unused_inputs):
+    return core.Windowing(window.GlobalWindows())
+
   def expand(self, pbegin):
     assert isinstance(pbegin, pvalue.PBegin)
     self.pipeline = pbegin.pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/testing/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index e32dda2..bf05ac1 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -19,6 +19,8 @@
 
 import unittest
 
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_stream import ElementEvent
 from apache_beam.testing.test_stream import ProcessingTimeEvent
 from apache_beam.testing.test_stream import TestStream
@@ -78,6 +80,41 @@ class TestStreamTest(unittest.TestCase):
                TimestampedValue('a', timestamp.MAX_TIMESTAMP)
            ]))
 
+  def test_basic_execution(self):
+    test_stream = (TestStream()
+                   .advance_watermark_to(10)
+                   .add_elements(['a', 'b', 'c'])
+                   .advance_watermark_to(20)
+                   .add_elements(['d'])
+                   .add_elements(['e'])
+                   .advance_processing_time(10)
+                   .advance_watermark_to(300)
+                   .add_elements([TimestampedValue('late', 12)])
+                   .add_elements([TimestampedValue('last', 310)]))
+
+    global _seen_elements  # pylint: disable=global-variable-undefined
+    _seen_elements = []
+
+    class RecordFn(beam.DoFn):
+      def process(self, element=beam.DoFn.ElementParam,
+                  timestamp=beam.DoFn.TimestampParam):
+        _seen_elements.append((element, timestamp))
+
+    p = TestPipeline()
+    my_record_fn = RecordFn()
+    p | test_stream | beam.ParDo(my_record_fn)  # pylint: disable=expression-not-assigned
+    p.run()
+
+    self.assertEqual([
+        ('a', timestamp.Timestamp(10)),
+        ('b', timestamp.Timestamp(10)),
+        ('c', timestamp.Timestamp(10)),
+        ('d', timestamp.Timestamp(20)),
+        ('e', timestamp.Timestamp(20)),
+        ('late', timestamp.Timestamp(12)),
+        ('last', timestamp.Timestamp(310)),], _seen_elements)
+    del _seen_elements
+
 
 if __name__ == '__main__':
   unittest.main()


Mime
View raw message