nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest (#172)
Date Tue, 04 Dec 2018 07:00:09 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new c2fdd43  [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
(#172)
c2fdd43 is described below

commit c2fdd431963f19b860f9f3135805b41495e55e57
Author: Taegeon Um <taegeonum@gmail.com>
AuthorDate: Tue Dec 4 16:00:04 2018 +0900

    [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest (#172)
    
    JIRA: [NEMO-270: Test different triggers in GroupByKeyAndWindowDoFnTransformTest](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-270)
    
    **Major changes:**
    - Fix `GroupByKeyAndWindowDoFnTransform` to properly handle `EARLY` and `LATE` triggering.
    
    **Tests for the changes:**
    - Add `eventTimeTriggerTest` to test complex triggering and lateness.
---
 .../GroupByKeyAndWindowDoFnTransform.java          |  29 +++--
 .../GroupByKeyAndWindowDoFnTransformTest.java      | 129 ++++++++++++++++++++-
 2 files changed, 140 insertions(+), 18 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 06dde58..a431508 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -240,15 +241,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
       // The DoFnRunner interface requires WindowedValue,
       // but this windowed value is actually not used in the ReduceFnRunner internal.
       getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-
-      // output watermark
-      // we set output watermark to the minimum of the timer data
-      long keyOutputTimestamp = Long.MAX_VALUE;
-      for (final TimerInternals.TimerData timer : timerDataList) {
-        keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis());
-      }
-
-      timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
     }
   }
 
@@ -349,14 +341,21 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
 
     @Override
     public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output)
{
-      // adds the output timestamp to the watermark hold of each key
-      // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
-      // TODO #270: consider early firing
-      // TODO #270: This logic may not be applied to early firing outputs
-      keyAndWatermarkHoldMap.put(output.getValue().getKey(),
-        new Watermark(output.getTimestamp().getMillis() + 1));
+
+      // The watermark advances only in ON_TIME
+      if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
+        final K key = output.getValue().getKey();
+        final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+          inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+        keyAndWatermarkHoldMap.put(key,
+          // adds the output timestamp to the watermark hold of each key
+          // +1 to the output timestamp because if the window is [0-5000), the timestamp
is 4999
+          new Watermark(output.getTimestamp().getMillis() + 1));
+        timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis()
+ 1));
+      }
       outputCollector.emit(output);
     }
+
     @Override
     public void emitWatermark(final Watermark watermark) {
       outputCollector.emitWatermark(watermark);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index 474c79c..f0749c0 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -33,16 +33,20 @@ import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-import static java.util.Collections.emptyList;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.EARLY;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.LATE;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-// TODO #270: Test different triggers
 public final class GroupByKeyAndWindowDoFnTransformTest {
-
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName());
   private final static Coder NULL_INPUT_CODER = null;
   private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS =
null;
 
@@ -248,4 +252,123 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
 
     doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+    final Duration lateness = Duration.standardSeconds(1);
+    final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
+      // early firing
+      .withEarlyFirings(
+        AfterProcessingTime
+          .pastFirstElementInPane()
+          // early firing 1 sec after receiving an element
+          .plusDelayOf(Duration.millis(1000)))
+      // late firing: Fire on any late data.
+      .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+    final FixedWindows window = (FixedWindows) Window.into(
+      FixedWindows.of(Duration.standardSeconds(5)))
+      // lateness
+      .withAllowedLateness(lateness)
+      .triggering(trigger)
+      // TODO #308: Test discarding of refinements
+      .accumulatingFiredPanes().getWindowFn();
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+      new GroupByKeyAndWindowDoFnTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(window).withTrigger(trigger)
+          .withMode(ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(lateness),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(NULL_INPUT_CODER),
+        DisplayData.none());
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
+
+    // early firing is not related to the watermark progress
+    doFnTransform.onWatermark(new Watermark(2));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+    // EARLY firing... waiting >= 1 sec
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    // GBKTransform emits data when receiving watermark
+    // TODO #250: element-wise processing
+    doFnTransform.onWatermark(new Watermark(5));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    // ACCUMULATION MODE
+    checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    // ON TIME
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(5001));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    // ACCUMULATION MODE
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // actual window: [0-5000)
+    // allowed lateness: 1000 (ms)
+    // current watermark: 5001
+    // data: 4500
+    // the data timestamp + allowed lateness > current watermark,
+    // so it should be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "bye!"), new Instant(4500),
+      window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6000));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    // The data should  be accumulated to the previous window because it allows 1 second
lateness
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // data timestamp: 4800
+    // current watermark: 6000
+    // data timestamp + allowed lateness < current watermark
+    // It should not be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello again!"), new Instant(4800),
+      window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6300));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    checkOutput(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+
+    doFnTransform.close();
+
+  }
 }


Mime
View raw message