beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: [BEAM-7122] Adding an accumulating trigger test (#8364)
Date Sat, 20 Apr 2019 22:48:54 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fadc8b  [BEAM-7122] Adding an accumulating trigger test (#8364)
1fadc8b is described below

commit 1fadc8bc49500587144f941ab442f2802700f676
Author: Pablo <pabloem@users.noreply.github.com>
AuthorDate: Sat Apr 20 15:48:43 2019 -0700

    [BEAM-7122] Adding an accumulating trigger test (#8364)
    
    * Adding an accumulating trigger test
---
 sdks/python/apache_beam/transforms/trigger_test.py | 51 ++++++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 1a0d66c..aaea930 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -29,9 +29,12 @@ from builtins import zip
 import yaml
 
 import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.direct.clock import TestClock
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import trigger
@@ -41,6 +44,7 @@ from apache_beam.transforms.trigger import AfterAll
 from apache_beam.transforms.trigger import AfterAny
 from apache_beam.transforms.trigger import AfterCount
 from apache_beam.transforms.trigger import AfterEach
+from apache_beam.transforms.trigger import AfterProcessingTime
 from apache_beam.transforms.trigger import AfterWatermark
 from apache_beam.transforms.trigger import DefaultTrigger
 from apache_beam.transforms.trigger import GeneralTriggerDriver
@@ -408,6 +412,19 @@ class RunnerApiTest(unittest.TestCase):
 
 class TriggerPipelineTest(unittest.TestCase):
 
+  def setUp(self):
+    # Use state on the TestCase class, since other references would be pickled
+    # into a closure and not have the desired side effects.
+    TriggerPipelineTest.all_records = []
+
+  def record_dofn(self):
+    class RecordDoFn(beam.DoFn):
+
+      def process(self, element):
+        TriggerPipelineTest.all_records.append(element)
+
+    return RecordDoFn()
+
   def test_after_count(self):
     with TestPipeline() as p:
       def construct_timestamped(k_t):
@@ -434,6 +451,40 @@ class TriggerPipelineTest(unittest.TestCase):
               }.items()
           )))
 
+  def test_multiple_accumulating_firings(self):
+    # PCollection will contain elements from 1 to 10.
+    elements = [i for i in range(1, 11)]
+
+    ts = TestStream().advance_watermark_to(0)
+    for i in elements:
+      ts.add_elements([('key', str(i))])
+      if i % 5 == 0:
+        ts.advance_watermark_to(i)
+        ts.advance_processing_time(5)
+
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (p
+           | ts
+           | beam.WindowInto(
+               FixedWindows(10),
+               accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
+               trigger=AfterWatermark(
+                   early=AfterAll(
+                       AfterCount(1), AfterProcessingTime(5))
+               ))
+           | beam.GroupByKey()
+           | beam.FlatMap(lambda x: x[1])
+           | beam.ParDo(self.record_dofn()))
+
+    # The trigger should fire twice. Once after 5 seconds, and once after 10.
+    # The firings should accumulate the output.
+    first_firing = [str(i) for i in elements if i <= 5]
+    second_firing = [str(i) for i in elements]
+    self.assertListEqual(first_firing + second_firing,
+                         TriggerPipelineTest.all_records)
+
 
 class TranscriptTest(unittest.TestCase):
 


Mime
View raw message