Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0949E200B95 for ; Tue, 13 Sep 2016 02:40:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 08104160AB8; Tue, 13 Sep 2016 00:40:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A3610160AC8 for ; Tue, 13 Sep 2016 02:40:42 +0200 (CEST) Received: (qmail 1986 invoked by uid 500); 13 Sep 2016 00:40:41 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 1880 invoked by uid 99); 13 Sep 2016 00:40:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E025EC03BC for ; Tue, 13 Sep 2016 00:40:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id vj_z16zvV9G5 for ; Tue, 13 Sep 2016 00:40:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 981295FBB5 for ; Tue, 13 Sep 2016 00:40:33 +0000 (UTC) Received: (qmail 98587 invoked by uid 99); 13 Sep 2016 00:40:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 999C7EEE18; Tue, 13 Sep 2016 00:40:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Tue, 13 Sep 2016 00:40:40 -0000 Message-Id: <6a57aaf6c14f45c6a97155923206f842@git.apache.org> In-Reply-To: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> References: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] incubator-beam git commit: Add LeaderBoardTest archived-at: Tue, 13 Sep 2016 00:40:44 -0000 Add LeaderBoardTest This test exercises the PTransforms that make up the LeaderBoard example. This includes speculative and late trigger firings to produce team and individual scores on a global and fixed window basis. Refactor LeaderBoard to expose the team and user score calculations as composite PTransforms to enable this testing. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00b4e951 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00b4e951 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00b4e951 Branch: refs/heads/gearpump-runner Commit: 00b4e95148eb98d7fea5877274f2fcf2252ac432 Parents: 74d0195 Author: Thomas Groh Authored: Fri Aug 5 14:20:56 2016 -0700 Committer: Dan Halperin Committed: Mon Sep 12 17:40:11 2016 -0700 ---------------------------------------------------------------------- .../examples/complete/game/LeaderBoard.java | 113 ++++-- .../examples/complete/game/LeaderBoardTest.java | 362 +++++++++++++++++++ 2 files changed, 440 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00b4e951/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 8dd4e39..13bbf44 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -17,6 +17,7 @@ */ package org.apache.beam.examples.complete.game; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.TimeZone; @@ -32,6 +33,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -68,7 +70,7 @@ import org.joda.time.format.DateTimeFormatter; * here we're using an unbounded data source, which lets us provide speculative results, and allows * handling of late data, at much lower latency. We can use the early/speculative results to keep a * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct - * results, e.g. for 'team prizes'. We're now outputing window results as they're + * results, e.g. for 'team prizes'. We're now outputting window results as they're * calculated, giving us much lower latency than with the previous batch examples. * *

Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector @@ -186,50 +188,91 @@ public class LeaderBoard extends HourlyTeamScore { .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) .apply("ParseGameEvent", ParDo.of(new ParseEventFn())); - // [START DocInclude_WindowAndTrigger] - // Extract team/score pairs from the event stream, using hour-long windows by default. - gameEvents - .apply("LeaderboardTeamFixedWindows", Window.into( - FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration()))) - // We will get early (speculative) results as well as cumulative - // processing of late data. - .triggering( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(FIVE_MINUTES)) - .withLateFirings(AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(TEN_MINUTES))) - .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())) - .accumulatingFiredPanes()) - // Extract and sum teamname/score pairs from the event data. - .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + gameEvents.apply("CalculateTeamScores", + new CalculateTeamScores( + Duration.standardMinutes(options.getTeamWindowDuration()), + Duration.standardMinutes(options.getAllowedLateness()))) // Write the results to BigQuery. .apply("WriteTeamScoreSums", new WriteWindowedToBigQuery>( options.getTableName() + "_team", configureWindowedTableWrite())); - // [END DocInclude_WindowAndTrigger] - - // [START DocInclude_ProcTimeTrigger] - // Extract user/score pairs from the event stream using processing time, via global windowing. - // Get periodic updates on all users' running scores. gameEvents - .apply("LeaderboardUserGlobalWindow", Window.into(new GlobalWindows()) - // Get periodic results every ten minutes. - .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(TEN_MINUTES))) - .accumulatingFiredPanes() - .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))) - // Extract and sum username/score pairs from the event data. - .apply("ExtractUserScore", new ExtractAndSumScore("user")) + .apply( + "CalculateUserScores", + new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness()))) // Write the results to BigQuery. - .apply("WriteUserScoreSums", - new WriteToBigQuery>( - options.getTableName() + "_user", configureGlobalWindowBigQueryWrite())); - // [END DocInclude_ProcTimeTrigger] + .apply( + "WriteUserScoreSums", + new WriteToBigQuery>( + options.getTableName() + "_user", configureGlobalWindowBigQueryWrite())); // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the // command line. PipelineResult result = pipeline.run(); exampleUtils.waitToFinish(result); } + + /** + * Calculates scores for each team within the configured window duration. + */ + // [START DocInclude_WindowAndTrigger] + // Extract team/score pairs from the event stream, using hour-long windows by default. + @VisibleForTesting + static class CalculateTeamScores + extends PTransform, PCollection>> { + private final Duration teamWindowDuration; + private final Duration allowedLateness; + + CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) { + this.teamWindowDuration = teamWindowDuration; + this.allowedLateness = allowedLateness; + } + + @Override + public PCollection> apply(PCollection infos) { + return infos.apply("LeaderboardTeamFixedWindows", + Window.into(FixedWindows.of(teamWindowDuration)) + // We will get early (speculative) results as well as cumulative + // processing of late data. + .triggering(AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(FIVE_MINUTES)) + .withLateFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(TEN_MINUTES))) + .withAllowedLateness(allowedLateness) + .accumulatingFiredPanes()) + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")); + } + } + // [END DocInclude_WindowAndTrigger] + + // [START DocInclude_ProcTimeTrigger] + /** + * Extract user/score pairs from the event stream using processing time, via global windowing. + * Get periodic updates on all users' running scores. + */ + @VisibleForTesting + static class CalculateUserScores + extends PTransform, PCollection>> { + private final Duration allowedLateness; + + CalculateUserScores(Duration allowedLateness) { + this.allowedLateness = allowedLateness; + } + + @Override + public PCollection> apply(PCollection input) { + return input.apply("LeaderboardUserGlobalWindow", + Window.into(new GlobalWindows()) + // Get periodic results every ten minutes. + .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(TEN_MINUTES))) + .accumulatingFiredPanes() + .withAllowedLateness(allowedLateness)) + // Extract and sum username/score pairs from the event data. + .apply("ExtractUserScore", new ExtractAndSumScore("user")); + } + } + // [END DocInclude_ProcTimeTrigger] } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00b4e951/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java new file mode 100644 index 0000000..40cac36 --- /dev/null +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.examples.complete.game; + +import static org.apache.beam.sdk.testing.PAssert.that; +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores; +import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores; +import org.apache.beam.examples.complete.game.UserScore.GameActionInfo; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LeaderBoard}. + */ +@RunWith(JUnit4.class) +public class LeaderBoardTest implements Serializable { + private static final Duration ALLOWED_LATENESS = Duration.standardHours(1); + private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes(20); + private Instant baseTime = new Instant(0); + + /** + * Some example users, on two separate teams. + */ + private enum TestUser { + RED_ONE("scarlet", "red"), RED_TWO("burgundy", "red"), + BLUE_ONE("navy", "blue"), BLUE_TWO("sky", "blue"); + + private final String userName; + private final String teamName; + + TestUser(String userName, String teamName) { + this.userName = userName; + this.teamName = teamName; + } + + public String getUser() { + return userName; + } + + public String getTeam() { + return teamName; + } + } + + /** + * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements arrive + * on time (ahead of the watermark). + */ + @Test + public void testTeamScoresOnTime() { + TestPipeline p = TestPipeline.create(); + + TestStream createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) + // Start at the epoch + .advanceWatermarkTo(baseTime) + // add some elements ahead of the watermark + .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), + event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)), + event(TestUser.RED_TWO, 3, Duration.standardSeconds(22)), + event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(3))) + // The watermark advances slightly, but not past the end of the window + .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3))) + // Add some more on time elements + .addElements(event(TestUser.RED_ONE, 1, Duration.standardMinutes(4)), + event(TestUser.BLUE_ONE, 2, Duration.standardSeconds(270))) + // The window should close and emit an ON_TIME pane + .advanceWatermarkToInfinity(); + + PCollection> teamScores = p.apply(createEvents) + .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); + + String blueTeam = TestUser.BLUE_ONE.getTeam(); + String redTeam = TestUser.RED_ONE.getTeam(); + that(teamScores) + .inOnTimePane(new IntervalWindow(baseTime, TEAM_WINDOW_DURATION)) + .containsInAnyOrder(KV.of(blueTeam, 12), KV.of(redTeam, 4)); + + p.run(); + } + + /** + * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements arrive + * on time, and the processing time advances far enough for speculative panes. + */ + @Test + public void testTeamScoresSpeculative() { + TestPipeline p = TestPipeline.create(); + + TestStream createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) + // Start at the epoch + .advanceWatermarkTo(baseTime) + .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), + event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1))) + // Some time passes within the runner, which causes a speculative pane containing the blue + // team's score to be emitted + .advanceProcessingTime(Duration.standardMinutes(10)) + .addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3))) + // Some additional time passes and we get a speculative pane for the red team + .advanceProcessingTime(Duration.standardMinutes(12)) + .addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22))) + // More time passes and a speculative pane containing a refined value for the blue pane is + // emitted + .advanceProcessingTime(Duration.standardMinutes(10)) + // Some more events occur + .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(4)), + event(TestUser.BLUE_TWO, 2, Duration.standardMinutes(2))) + // The window closes and we get an ON_TIME pane that contains all of the updates + .advanceWatermarkToInfinity(); + + PCollection> teamScores = p.apply(createEvents) + .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); + + String blueTeam = TestUser.BLUE_ONE.getTeam(); + String redTeam = TestUser.RED_ONE.getTeam(); + IntervalWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); + // The window contains speculative panes alongside the on-time pane + PAssert.that(teamScores) + .inWindow(window) + .containsInAnyOrder(KV.of(blueTeam, 10) /* The on-time blue pane */, + KV.of(redTeam, 9) /* The on-time red pane */, + KV.of(blueTeam, 5) /* The first blue speculative pane */, + KV.of(blueTeam, 8) /* The second blue speculative pane */, + KV.of(redTeam, 5) /* The red speculative pane */); + PAssert.that(teamScores) + .inOnTimePane(window) + .containsInAnyOrder(KV.of(blueTeam, 10), KV.of(redTeam, 9)); + + p.run(); + } + + /** + * A test where elements arrive behind the watermark (late data), but before the end of the + * window. These elements are emitted on time. + */ + @Test + public void testTeamScoresUnobservablyLate() { + TestPipeline p = TestPipeline.create(); + + BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); + TestStream createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) + .advanceWatermarkTo(baseTime) + .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), + event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8)), + event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), + event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5))) + .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1))) + // These events are late, but the window hasn't closed yet, so the elements are in the + // on-time pane + .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO), + event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)), + event(TestUser.BLUE_TWO, 2, Duration.standardSeconds(90)), + event(TestUser.RED_TWO, 3, Duration.standardMinutes(3))) + .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1))) + .advanceWatermarkToInfinity(); + PCollection> teamScores = p.apply(createEvents) + .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); + + String blueTeam = TestUser.BLUE_ONE.getTeam(); + String redTeam = TestUser.RED_ONE.getTeam(); + // The On Time pane contains the late elements that arrived before the end of the window + PAssert.that(teamScores) + .inOnTimePane(window) + .containsInAnyOrder(KV.of(redTeam, 14), KV.of(blueTeam, 13)); + + p.run(); + } + + /** + * A test where elements arrive behind the watermark (late data) after the watermark passes the + * end of the window, but before the maximum allowed lateness. These elements are emitted in a + * late pane. + */ + @Test + public void testTeamScoresObservablyLate() { + TestPipeline p = TestPipeline.create(); + + Instant firstWindowCloses = baseTime.plus(ALLOWED_LATENESS).plus(TEAM_WINDOW_DURATION); + TestStream createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) + .advanceWatermarkTo(baseTime) + .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), + event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8))) + .advanceProcessingTime(Duration.standardMinutes(10)) + .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3))) + .addElements(event(TestUser.RED_ONE, 3, Duration.standardMinutes(1)), + event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), + event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5))) + .advanceWatermarkTo(firstWindowCloses.minus(Duration.standardMinutes(1))) + // These events are late but should still appear in a late pane + .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO), + event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)), + event(TestUser.RED_TWO, 3, Duration.standardMinutes(3))) + // A late refinement is emitted due to the advance in processing time, but the window has + // not yet closed because the watermark has not advanced + .advanceProcessingTime(Duration.standardMinutes(12)) + // These elements should appear in the final pane + .addElements(event(TestUser.RED_TWO, 9, Duration.standardMinutes(1)), + event(TestUser.RED_TWO, 1, Duration.standardMinutes(3))) + .advanceWatermarkToInfinity(); + + PCollection> teamScores = p.apply(createEvents) + .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); + + BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); + String blueTeam = TestUser.BLUE_ONE.getTeam(); + String redTeam = TestUser.RED_ONE.getTeam(); + PAssert.that(teamScores) + .inWindow(window) + .satisfies((SerializableFunction>, Void>) input -> { + // The final sums need not exist in the same pane, but must appear in the output + // PCollection + assertThat(input, hasItem(KV.of(blueTeam, 11))); + assertThat(input, hasItem(KV.of(redTeam, 27))); + return null; + }); + PAssert.thatMap(teamScores) + // The closing behavior of CalculateTeamScores precludes an inFinalPane matcher + .inOnTimePane(window) + .isEqualTo(ImmutableMap.builder().put(redTeam, 7) + .put(blueTeam, 11) + .build()); + + // No final pane is emitted for the blue team, as all of their updates have been taken into + // account in earlier panes + PAssert.that(teamScores).inFinalPane(window).containsInAnyOrder(KV.of(redTeam, 27)); + + p.run(); + } + + /** + * A test where elements arrive beyond the maximum allowed lateness. These elements are dropped + * within {@link CalculateTeamScores} and do not impact the final result. + */ + @Test + public void testTeamScoresDroppablyLate() { + TestPipeline p = TestPipeline.create(); + + BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); + TestStream infos = TestStream.create(AvroCoder.of(GameActionInfo.class)) + .addElements(event(TestUser.BLUE_ONE, 12, Duration.ZERO), + event(TestUser.RED_ONE, 3, Duration.ZERO)) + .advanceWatermarkTo(window.maxTimestamp()) + .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), + event(TestUser.BLUE_TWO, 3, Duration.ZERO), + event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3))) + // Move the watermark past the end of the allowed lateness plus the end of the window + .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS) + .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1))) + // These elements within the expired window are droppably late, and will not appear in the + // output + .addElements( + event(TestUser.BLUE_TWO, 3, TEAM_WINDOW_DURATION.minus(Duration.standardSeconds(5))), + event(TestUser.RED_ONE, 7, Duration.standardMinutes(4))) + .advanceWatermarkToInfinity(); + PCollection> teamScores = p.apply(infos) + .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); + + String blueTeam = TestUser.BLUE_ONE.getTeam(); + String redTeam = TestUser.RED_ONE.getTeam(); + // Only one on-time pane and no late panes should be emitted + PAssert.that(teamScores) + .inWindow(window) + .containsInAnyOrder(KV.of(redTeam, 7), KV.of(blueTeam, 18)); + // No elements are added before the watermark passes the end of the window plus the allowed + // lateness, so no refinement should be emitted + PAssert.that(teamScores).inFinalPane(window).empty(); + } + + /** + * A test where elements arrive both on-time and late in {@link CalculateUserScores}, which emits + * output into the {@link GlobalWindow}. All elements that arrive should be taken into account, + * even if they arrive later than the maximum allowed lateness. + */ + @Test + public void testUserScore() { + TestPipeline p = TestPipeline.create(); + + TestStream infos = + TestStream.create(AvroCoder.of(GameActionInfo.class)) + .addElements( + event(TestUser.BLUE_ONE, 12, Duration.ZERO), + event(TestUser.RED_ONE, 3, Duration.ZERO)) + .advanceProcessingTime(Duration.standardMinutes(7)) + .addElements( + event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), + event(TestUser.BLUE_TWO, 3, Duration.ZERO), + event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3))) + .advanceProcessingTime(Duration.standardMinutes(5)) + .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS).plus(Duration.standardHours(12))) + // Late elements are always observable within the global window - they arrive before + // the window closes, so they will appear in a pane, even if they arrive after the + // allowed lateness, and are taken into account alongside on-time elements + .addElements( + event(TestUser.RED_ONE, 3, Duration.standardMinutes(7)), + event(TestUser.RED_ONE, 2, (ALLOWED_LATENESS).plus(Duration.standardHours(13)))) + .advanceProcessingTime(Duration.standardMinutes(6)) + .addElements(event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(12))) + .advanceProcessingTime(Duration.standardMinutes(20)) + .advanceWatermarkToInfinity(); + + PCollection> userScores = + p.apply(infos).apply(new CalculateUserScores(ALLOWED_LATENESS)); + + // User scores are emitted in speculative panes in the Global Window - this matcher choice + // ensures that panes emitted by the watermark advancing to positive infinity are not included, + // as that will not occur outside of tests + that(userScores) + .inEarlyGlobalWindowPanes() + .containsInAnyOrder(KV.of(TestUser.BLUE_ONE.getUser(), 15), + KV.of(TestUser.RED_ONE.getUser(), 7), + KV.of(TestUser.RED_ONE.getUser(), 12), + KV.of(TestUser.BLUE_TWO.getUser(), 3), + KV.of(TestUser.BLUE_TWO.getUser(), 8)); + + p.run(); + } + + private TimestampedValue event( + TestUser user, + int score, + Duration baseTimeOffset) { + return TimestampedValue.of(new GameActionInfo(user.getUser(), + user.getTeam(), + score, + baseTime.plus(baseTimeOffset).getMillis()), baseTime.plus(baseTimeOffset)); + } +}