beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [09/50] [abbrv] incubator-beam git commit: Add LeaderBoardTest
Date Tue, 13 Sep 2016 00:40:40 GMT
Add LeaderBoardTest

This test exercises the PTransforms that make up the LeaderBoard
example. This includes speculative and late trigger firings to produce
team and individual scores on a global and fixed window basis.

Refactor LeaderBoard to expose the team and user score calculations as
composite PTransforms to enable this testing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00b4e951
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00b4e951
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00b4e951

Branch: refs/heads/gearpump-runner
Commit: 00b4e95148eb98d7fea5877274f2fcf2252ac432
Parents: 74d0195
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Aug 5 14:20:56 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/game/LeaderBoard.java     | 113 ++++--
 .../examples/complete/game/LeaderBoardTest.java | 362 +++++++++++++++++++
 2 files changed, 440 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00b4e951/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 8dd4e39..13bbf44 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.examples.complete.game;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -68,7 +70,7 @@ import org.joda.time.format.DateTimeFormatter;
  * here we're using an unbounded data source, which lets us provide speculative results,
and allows
  * handling of late data, at much lower latency. We can use the early/speculative results
to keep a
  * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct
- * results, e.g. for 'team prizes'. We're now outputing window results as they're
+ * results, e.g. for 'team prizes'. We're now outputting window results as they're
  * calculated, giving us much lower latency than with the previous batch examples.
  *
  * <p> Run {@link injector.Injector} to generate pubsub data for this pipeline.  The
Injector
@@ -186,50 +188,91 @@ public class LeaderBoard extends HourlyTeamScore {
         .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
-    // [START DocInclude_WindowAndTrigger]
-    // Extract team/score pairs from the event stream, using hour-long windows by default.
-    gameEvents
-        .apply("LeaderboardTeamFixedWindows", Window.<GameActionInfo>into(
-            FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration())))
-          // We will get early (speculative) results as well as cumulative
-          // processing of late data.
-          .triggering(
-            AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
-                  .plusDelayOf(FIVE_MINUTES))
-            .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
-                  .plusDelayOf(TEN_MINUTES)))
-          .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
-          .accumulatingFiredPanes())
-        // Extract and sum teamname/score pairs from the event data.
-        .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+    gameEvents.apply("CalculateTeamScores",
+        new CalculateTeamScores(
+            Duration.standardMinutes(options.getTeamWindowDuration()),
+            Duration.standardMinutes(options.getAllowedLateness())))
         // Write the results to BigQuery.
         .apply("WriteTeamScoreSums",
                new WriteWindowedToBigQuery<KV<String, Integer>>(
                   options.getTableName() + "_team", configureWindowedTableWrite()));
-    // [END DocInclude_WindowAndTrigger]
-
-    // [START DocInclude_ProcTimeTrigger]
-    // Extract user/score pairs from the event stream using processing time, via global windowing.
-    // Get periodic updates on all users' running scores.
     gameEvents
-        .apply("LeaderboardUserGlobalWindow", Window.<GameActionInfo>into(new GlobalWindows())
-          // Get periodic results every ten minutes.
-              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
-                  .plusDelayOf(TEN_MINUTES)))
-              .accumulatingFiredPanes()
-              .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())))
-        // Extract and sum username/score pairs from the event data.
-        .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+        .apply(
+            "CalculateUserScores",
+            new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness())))
         // Write the results to BigQuery.
-        .apply("WriteUserScoreSums",
-               new WriteToBigQuery<KV<String, Integer>>(
-                  options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
-    // [END DocInclude_ProcTimeTrigger]
+        .apply(
+            "WriteUserScoreSums",
+            new WriteToBigQuery<KV<String, Integer>>(
+                options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
 
     // Run the pipeline and wait for the pipeline to finish; capture cancellation requests
from the
     // command line.
     PipelineResult result = pipeline.run();
     exampleUtils.waitToFinish(result);
   }
+
+  /**
+   * Calculates scores for each team within the configured window duration.
+   */
+  // [START DocInclude_WindowAndTrigger]
+  // Extract team/score pairs from the event stream, using hour-long windows by default.
+  @VisibleForTesting
+  static class CalculateTeamScores
+      extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String,
Integer>>> {
+    private final Duration teamWindowDuration;
+    private final Duration allowedLateness;
+
+    CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
+      this.teamWindowDuration = teamWindowDuration;
+      this.allowedLateness = allowedLateness;
+    }
+
+    @Override
+    public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo>
infos) {
+      return infos.apply("LeaderboardTeamFixedWindows",
+          Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
+              // We will get early (speculative) results as well as cumulative
+              // processing of late data.
+              .triggering(AfterWatermark.pastEndOfWindow()
+                  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+                      .plusDelayOf(FIVE_MINUTES))
+                  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
+                      .plusDelayOf(TEN_MINUTES)))
+              .withAllowedLateness(allowedLateness)
+              .accumulatingFiredPanes())
+          // Extract and sum teamname/score pairs from the event data.
+          .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
+    }
+  }
+  // [END DocInclude_WindowAndTrigger]
+
+  // [START DocInclude_ProcTimeTrigger]
+  /**
+   * Extract user/score pairs from the event stream using processing time, via global windowing.
+   * Get periodic updates on all users' running scores.
+   */
+  @VisibleForTesting
+  static class CalculateUserScores
+      extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String,
Integer>>> {
+    private final Duration allowedLateness;
+
+    CalculateUserScores(Duration allowedLateness) {
+      this.allowedLateness = allowedLateness;
+    }
+
+    @Override
+    public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo>
input) {
+      return input.apply("LeaderboardUserGlobalWindow",
+          Window.<GameActionInfo>into(new GlobalWindows())
+              // Get periodic results every ten minutes.
+              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(TEN_MINUTES)))
+              .accumulatingFiredPanes()
+              .withAllowedLateness(allowedLateness))
+          // Extract and sum username/score pairs from the event data.
+          .apply("ExtractUserScore", new ExtractAndSumScore("user"));
+    }
+  }
+  // [END DocInclude_ProcTimeTrigger]
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00b4e951/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
new file mode 100644
index 0000000..40cac36
--- /dev/null
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete.game;
+
+import static org.apache.beam.sdk.testing.PAssert.that;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores;
+import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores;
+import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LeaderBoard}.
+ */
+@RunWith(JUnit4.class)
+public class LeaderBoardTest implements Serializable {
+  private static final Duration ALLOWED_LATENESS = Duration.standardHours(1);
+  private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes(20);
+  private Instant baseTime = new Instant(0);
+
+  /**
+   * Some example users, on two separate teams.
+   */
+  private enum TestUser {
+    RED_ONE("scarlet", "red"), RED_TWO("burgundy", "red"),
+    BLUE_ONE("navy", "blue"), BLUE_TWO("sky", "blue");
+
+    private final String userName;
+    private final String teamName;
+
+    TestUser(String userName, String teamName) {
+      this.userName = userName;
+      this.teamName = teamName;
+    }
+
+    public String getUser() {
+      return userName;
+    }
+
+    public String getTeam() {
+      return teamName;
+    }
+  }
+
+  /**
+   * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements
arrive
+   * on time (ahead of the watermark).
+   */
+  @Test
+  public void testTeamScoresOnTime() {
+    TestPipeline p = TestPipeline.create();
+
+    TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+        // Start at the epoch
+        .advanceWatermarkTo(baseTime)
+        // add some elements ahead of the watermark
+        .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+            event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)),
+            event(TestUser.RED_TWO, 3, Duration.standardSeconds(22)),
+            event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(3)))
+        // The watermark advances slightly, but not past the end of the window
+        .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3)))
+        // Add some more on time elements
+        .addElements(event(TestUser.RED_ONE, 1, Duration.standardMinutes(4)),
+            event(TestUser.BLUE_ONE, 2, Duration.standardSeconds(270)))
+        // The window should close and emit an ON_TIME pane
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+        .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+    String blueTeam = TestUser.BLUE_ONE.getTeam();
+    String redTeam = TestUser.RED_ONE.getTeam();
+    that(teamScores)
+        .inOnTimePane(new IntervalWindow(baseTime, TEAM_WINDOW_DURATION))
+        .containsInAnyOrder(KV.of(blueTeam, 12), KV.of(redTeam, 4));
+
+    p.run();
+  }
+
+  /**
+   * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements
arrive
+   * on time, and the processing time advances far enough for speculative panes.
+   */
+  @Test
+  public void testTeamScoresSpeculative() {
+    TestPipeline p = TestPipeline.create();
+
+    TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+        // Start at the epoch
+        .advanceWatermarkTo(baseTime)
+        .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+            event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)))
+        // Some time passes within the runner, which causes a speculative pane containing
the blue
+        // team's score to be emitted
+        .advanceProcessingTime(Duration.standardMinutes(10))
+        .addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3)))
+        // Some additional time passes and we get a speculative pane for the red team
+        .advanceProcessingTime(Duration.standardMinutes(12))
+        .addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22)))
+        // More time passes and a speculative pane containing a refined value for the blue
pane is
+        // emitted
+        .advanceProcessingTime(Duration.standardMinutes(10))
+        // Some more events occur
+        .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(4)),
+            event(TestUser.BLUE_TWO, 2, Duration.standardMinutes(2)))
+        // The window closes and we get an ON_TIME pane that contains all of the updates
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+        .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+    String blueTeam = TestUser.BLUE_ONE.getTeam();
+    String redTeam = TestUser.RED_ONE.getTeam();
+    IntervalWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+    // The window contains speculative panes alongside the on-time pane
+    PAssert.that(teamScores)
+        .inWindow(window)
+        .containsInAnyOrder(KV.of(blueTeam, 10) /* The on-time blue pane */,
+            KV.of(redTeam, 9) /* The on-time red pane */,
+            KV.of(blueTeam, 5) /* The first blue speculative pane */,
+            KV.of(blueTeam, 8) /* The second blue speculative pane */,
+            KV.of(redTeam, 5) /* The red speculative pane */);
+     PAssert.that(teamScores)
+        .inOnTimePane(window)
+        .containsInAnyOrder(KV.of(blueTeam, 10), KV.of(redTeam, 9));
+
+    p.run();
+  }
+
+  /**
+   * A test where elements arrive behind the watermark (late data), but before the end of
the
+   * window. These elements are emitted on time.
+   */
+  @Test
+  public void testTeamScoresUnobservablyLate() {
+    TestPipeline p = TestPipeline.create();
+
+    BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+    TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+        .advanceWatermarkTo(baseTime)
+        .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+            event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8)),
+            event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+            event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5)))
+        .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1)))
+        // These events are late, but the window hasn't closed yet, so the elements are in
the
+        // on-time pane
+        .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO),
+            event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
+            event(TestUser.BLUE_TWO, 2, Duration.standardSeconds(90)),
+            event(TestUser.RED_TWO, 3, Duration.standardMinutes(3)))
+        .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))
+        .advanceWatermarkToInfinity();
+    PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+        .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+    String blueTeam = TestUser.BLUE_ONE.getTeam();
+    String redTeam = TestUser.RED_ONE.getTeam();
+    // The On Time pane contains the late elements that arrived before the end of the window
+    PAssert.that(teamScores)
+        .inOnTimePane(window)
+        .containsInAnyOrder(KV.of(redTeam, 14), KV.of(blueTeam, 13));
+
+    p.run();
+  }
+
+  /**
+   * A test where elements arrive behind the watermark (late data) after the watermark passes
the
+   * end of the window, but before the maximum allowed lateness. These elements are emitted
in a
+   * late pane.
+   */
+  @Test
+  public void testTeamScoresObservablyLate() {
+    TestPipeline p = TestPipeline.create();
+
+    Instant firstWindowCloses = baseTime.plus(ALLOWED_LATENESS).plus(TEAM_WINDOW_DURATION);
+    TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+        .advanceWatermarkTo(baseTime)
+        .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+            event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8)))
+        .advanceProcessingTime(Duration.standardMinutes(10))
+        .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3)))
+        .addElements(event(TestUser.RED_ONE, 3, Duration.standardMinutes(1)),
+            event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+            event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5)))
+        .advanceWatermarkTo(firstWindowCloses.minus(Duration.standardMinutes(1)))
+        // These events are late but should still appear in a late pane
+        .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO),
+            event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
+            event(TestUser.RED_TWO, 3, Duration.standardMinutes(3)))
+        // A late refinement is emitted due to the advance in processing time, but the window
has
+        // not yet closed because the watermark has not advanced
+        .advanceProcessingTime(Duration.standardMinutes(12))
+        // These elements should appear in the final pane
+        .addElements(event(TestUser.RED_TWO, 9, Duration.standardMinutes(1)),
+            event(TestUser.RED_TWO, 1, Duration.standardMinutes(3)))
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+        .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+    BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+    String blueTeam = TestUser.BLUE_ONE.getTeam();
+    String redTeam = TestUser.RED_ONE.getTeam();
+    PAssert.that(teamScores)
+        .inWindow(window)
+        .satisfies((SerializableFunction<Iterable<KV<String, Integer>>, Void>)
input -> {
+          // The final sums need not exist in the same pane, but must appear in the output
+          // PCollection
+          assertThat(input, hasItem(KV.of(blueTeam, 11)));
+          assertThat(input, hasItem(KV.of(redTeam, 27)));
+          return null;
+        });
+    PAssert.thatMap(teamScores)
+        // The closing behavior of CalculateTeamScores precludes an inFinalPane matcher
+        .inOnTimePane(window)
+        .isEqualTo(ImmutableMap.<String, Integer>builder().put(redTeam, 7)
+            .put(blueTeam, 11)
+            .build());
+
+    // No final pane is emitted for the blue team, as all of their updates have been taken
into
+    // account in earlier panes
+    PAssert.that(teamScores).inFinalPane(window).containsInAnyOrder(KV.of(redTeam, 27));
+
+    p.run();
+  }
+
+  /**
+   * A test where elements arrive beyond the maximum allowed lateness. These elements are
dropped
+   * within {@link CalculateTeamScores} and do not impact the final result.
+   */
+  @Test
+  public void testTeamScoresDroppablyLate() {
+    TestPipeline p = TestPipeline.create();
+
+    BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+    TestStream<GameActionInfo> infos = TestStream.create(AvroCoder.of(GameActionInfo.class))
+        .addElements(event(TestUser.BLUE_ONE, 12, Duration.ZERO),
+            event(TestUser.RED_ONE, 3, Duration.ZERO))
+        .advanceWatermarkTo(window.maxTimestamp())
+        .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+            event(TestUser.BLUE_TWO, 3, Duration.ZERO),
+            event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+        // Move the watermark past the end of the allowed lateness plus the end of the window
+        .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
+            .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))
+        // These elements within the expired window are droppably late, and will not appear
in the
+        // output
+        .addElements(
+            event(TestUser.BLUE_TWO, 3, TEAM_WINDOW_DURATION.minus(Duration.standardSeconds(5))),
+            event(TestUser.RED_ONE, 7, Duration.standardMinutes(4)))
+        .advanceWatermarkToInfinity();
+    PCollection<KV<String, Integer>> teamScores = p.apply(infos)
+        .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+    String blueTeam = TestUser.BLUE_ONE.getTeam();
+    String redTeam = TestUser.RED_ONE.getTeam();
+    // Only one on-time pane and no late panes should be emitted
+    PAssert.that(teamScores)
+        .inWindow(window)
+        .containsInAnyOrder(KV.of(redTeam, 7), KV.of(blueTeam, 18));
+    // No elements are added before the watermark passes the end of the window plus the allowed
+    // lateness, so no refinement should be emitted
+    PAssert.that(teamScores).inFinalPane(window).empty();
+  }
+
+  /**
+   * A test where elements arrive both on-time and late in {@link CalculateUserScores}, which
emits
+   * output into the {@link GlobalWindow}. All elements that arrive should be taken into
account,
+   * even if they arrive later than the maximum allowed lateness.
+   */
+  @Test
+  public void testUserScore() {
+    TestPipeline p = TestPipeline.create();
+
+    TestStream<GameActionInfo> infos =
+        TestStream.create(AvroCoder.of(GameActionInfo.class))
+            .addElements(
+                event(TestUser.BLUE_ONE, 12, Duration.ZERO),
+                event(TestUser.RED_ONE, 3, Duration.ZERO))
+            .advanceProcessingTime(Duration.standardMinutes(7))
+            .addElements(
+                event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+                event(TestUser.BLUE_TWO, 3, Duration.ZERO),
+                event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+            .advanceProcessingTime(Duration.standardMinutes(5))
+            .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS).plus(Duration.standardHours(12)))
+            // Late elements are always observable within the global window - they arrive
before
+            // the window closes, so they will appear in a pane, even if they arrive after
the
+            // allowed lateness, and are taken into account alongside on-time elements
+            .addElements(
+                event(TestUser.RED_ONE, 3, Duration.standardMinutes(7)),
+                event(TestUser.RED_ONE, 2, (ALLOWED_LATENESS).plus(Duration.standardHours(13))))
+            .advanceProcessingTime(Duration.standardMinutes(6))
+            .addElements(event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(12)))
+            .advanceProcessingTime(Duration.standardMinutes(20))
+            .advanceWatermarkToInfinity();
+
+    PCollection<KV<String, Integer>> userScores =
+        p.apply(infos).apply(new CalculateUserScores(ALLOWED_LATENESS));
+
+    // User scores are emitted in speculative panes in the Global Window - this matcher choice
+    // ensures that panes emitted by the watermark advancing to positive infinity are not
included,
+    // as that will not occur outside of tests
+    that(userScores)
+        .inEarlyGlobalWindowPanes()
+        .containsInAnyOrder(KV.of(TestUser.BLUE_ONE.getUser(), 15),
+            KV.of(TestUser.RED_ONE.getUser(), 7),
+            KV.of(TestUser.RED_ONE.getUser(), 12),
+            KV.of(TestUser.BLUE_TWO.getUser(), 3),
+            KV.of(TestUser.BLUE_TWO.getUser(), 8));
+
+    p.run();
+  }
+
+  private TimestampedValue<GameActionInfo> event(
+      TestUser user,
+      int score,
+      Duration baseTimeOffset) {
+    return TimestampedValue.of(new GameActionInfo(user.getUser(),
+        user.getTeam(),
+        score,
+        baseTime.plus(baseTimeOffset).getMillis()), baseTime.plus(baseTimeOffset));
+  }
+}


Mime
View raw message