Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DDB4319615 for ; Thu, 24 Mar 2016 02:48:18 +0000 (UTC) Received: (qmail 9041 invoked by uid 500); 24 Mar 2016 02:48:18 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 8982 invoked by uid 500); 24 Mar 2016 02:48:18 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 8971 invoked by uid 99); 24 Mar 2016 02:48:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:48:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 3D92CC1EDB for ; Thu, 24 Mar 2016 02:48:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.23 X-Spam-Level: X-Spam-Status: No, score=-3.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id VyeK1m0ee27P for ; Thu, 24 Mar 2016 02:48:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 4844E60E99 for ; Thu, 24 Mar 2016 02:47:29 +0000 (UTC) Received: (qmail 3975 invoked by uid 99); 24 Mar 2016 02:47:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:47:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 75B1DE97E9; Thu, 24 Mar 2016 02:47:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Thu, 24 Mar 2016 02:48:27 -0000 Message-Id: In-Reply-To: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> References: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [63/67] incubator-beam git commit: Directory reorganization Directory reorganization Move Java 8 examples from "java8examples/" into "examples/java8/". Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11bb9e0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11bb9e0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11bb9e0e Branch: refs/heads/master Commit: 11bb9e0e61f8b15ce81e5181baa5458bb715a059 Parents: 2eaa709 Author: Davor Bonaci Authored: Wed Mar 23 17:16:47 2016 -0700 Committer: Davor Bonaci Committed: Wed Mar 23 18:33:33 2016 -0700 ---------------------------------------------------------------------- examples/java8/pom.xml | 279 +++++++++++++ .../examples/MinimalWordCountJava8.java | 68 +++ .../examples/complete/game/GameStats.java | 339 +++++++++++++++ .../examples/complete/game/HourlyTeamScore.java | 193 +++++++++ .../examples/complete/game/LeaderBoard.java | 237 +++++++++++ .../dataflow/examples/complete/game/README.md | 113 +++++ .../examples/complete/game/UserScore.java | 239 +++++++++++ .../complete/game/injector/Injector.java | 415 +++++++++++++++++++ .../complete/game/injector/InjectorUtils.java | 101 +++++ .../injector/RetryHttpInitializerWrapper.java | 126 ++++++ .../complete/game/utils/WriteToBigQuery.java | 134 ++++++ .../game/utils/WriteWindowedToBigQuery.java | 76 ++++ .../examples/MinimalWordCountJava8Test.java | 103 +++++ .../examples/complete/game/GameStatsTest.java | 76 ++++ .../complete/game/HourlyTeamScoreTest.java | 111 +++++ .../examples/complete/game/UserScoreTest.java | 154 +++++++ java8examples/pom.xml | 279 ------------- .../examples/MinimalWordCountJava8.java | 68 --- .../examples/complete/game/GameStats.java | 339 --------------- .../examples/complete/game/HourlyTeamScore.java | 193 --------- .../examples/complete/game/LeaderBoard.java | 237 ----------- .../dataflow/examples/complete/game/README.md | 113 ----- .../examples/complete/game/UserScore.java | 239 ----------- .../complete/game/injector/Injector.java | 415 ------------------- .../complete/game/injector/InjectorUtils.java | 101 ----- .../injector/RetryHttpInitializerWrapper.java | 126 ------ .../complete/game/utils/WriteToBigQuery.java | 134 ------ .../game/utils/WriteWindowedToBigQuery.java | 76 ---- .../examples/MinimalWordCountJava8Test.java | 103 ----- .../examples/complete/game/GameStatsTest.java | 76 ---- .../complete/game/HourlyTeamScoreTest.java | 111 ----- .../examples/complete/game/UserScoreTest.java | 154 ------- pom.xml | 2 +- 33 files changed, 2765 insertions(+), 2765 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml new file mode 100644 index 0000000..7d55c31 --- /dev/null +++ b/examples/java8/pom.xml @@ -0,0 +1,279 @@ + + + + + 4.0.0 + + + org.apache.beam + parent + 0.1.0-incubating-SNAPSHOT + ../../pom.xml + + + java8examples-all + Apache Beam :: Examples :: Java 8 All + Apache Beam Java SDK provides a simple, Java-based + interface for processing virtually any size data. + This artifact includes examples of the SDK from a Java 8 + user. + + jar + + + + DataflowPipelineTests + + true + com.google.cloud.dataflow.sdk.testing.RunnableOnService + both + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + 1.8 + 1.8 + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + analyze-only + + true + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.12 + + + com.puppycrawl.tools + checkstyle + 6.6 + + + + ../../checkstyle.xml + true + true + true + false + + + + + check + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.4 + + + attach-sources + compile + + jar + + + + attach-test-sources + test-compile + + test-jar + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + default-jar + + jar + + + + default-test-jar + + test-jar + + + + + + + + org.jacoco + jacoco-maven-plugin + + + + + + + org.apache.beam + java-sdk-all + ${project.version} + + + + org.apache.beam + java-examples-all + ${project.version} + + + + com.google.guava + guava + ${guava.version} + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.apache.avro + avro + ${avro.version} + + + + joda-time + joda-time + ${joda.version} + + + + org.hamcrest + hamcrest-all + ${hamcrest.version} + test + + + + org.mockito + mockito-all + 1.10.19 + test + + + + junit + junit + ${junit.version} + test + + + + com.google.apis + google-api-services-bigquery + ${bigquery.version} + + + + com.google.guava + guava-jdk5 + + + + + + com.google.http-client + google-http-client + ${google-clients.version} + + + + com.google.guava + guava-jdk5 + + + + + + com.google.oauth-client + google-oauth-client + ${google-clients.version} + + + + com.google.guava + guava-jdk5 + + + + + + com.google.apis + google-api-services-pubsub + ${pubsub.version} + + + + com.google.guava + guava-jdk5 + + + + + + com.google.api-client + google-api-client + ${google-clients.version} + + + com.google.guava + guava-jdk5 + + + + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java new file mode 100644 index 0000000..c115ea0 --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Filter; +import com.google.cloud.dataflow.sdk.transforms.FlatMapElements; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import java.util.Arrays; + +/** + * An example that counts words in Shakespeare, using Java 8 language features. + * + *

See {@link MinimalWordCount} for a comprehensive explanation. + */ +public class MinimalWordCountJava8 { + + public static void main(String[] args) { + DataflowPipelineOptions options = PipelineOptionsFactory.create() + .as(DataflowPipelineOptions.class); + + options.setRunner(BlockingDataflowPipelineRunner.class); + + // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud. + options.setProject("SET_YOUR_PROJECT_ID_HERE"); + + // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files. + options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); + + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) + .withOutputType(new TypeDescriptor() {})) + .apply(Filter.byPredicate((String word) -> !word.isEmpty())) + .apply(Count.perElement()) + .apply(MapElements + .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) + .withOutputType(new TypeDescriptor() {})) + + // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to. + .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java new file mode 100644 index 0000000..7c67d10 --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java @@ -0,0 +1,339 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; +import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.io.PubsubIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.transforms.Mean; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * This class is the fourth in a series of four pipelines that tell a story in a 'gaming' + * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}. + * New concepts: session windows and finding session duration; use of both + * singleton and non-singleton side inputs. + * + *

This pipeline builds on the {@link LeaderBoard} functionality, and adds some "business + * intelligence" analysis: abuse detection and usage patterns. The pipeline derives the Mean user + * score sum for a window, and uses that information to identify likely spammers/robots. (The robots + * have a higher click rate than the human users). The 'robot' users are then filtered out when + * calculating the team scores. + * + *

Additionally, user sessions are tracked: that is, we find bursts of user activity using + * session windows. Then, the mean session duration information is recorded in the context of + * subsequent fixed windowing. (This could be used to tell us what games are giving us greater + * user retention). + * + *

Run {@code com.google.cloud.dataflow.examples.complete.game.injector.Injector} to generate + * pubsub data for this pipeline. The {@code Injector} documentation provides more detail. + * + *

To execute this pipeline using the Dataflow service, specify the pipeline configuration + * like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. The PubSub topic you specify should + * be the same topic to which the Injector is publishing. + */ +public class GameStats extends LeaderBoard { + + private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + + /** + * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs. + * We do this by finding the mean total score per user, then using that information as a side + * input to filter out all but those user scores that are > (mean * SCORE_WEIGHT) + */ + // [START DocInclude_AbuseDetect] + public static class CalculateSpammyUsers + extends PTransform>, PCollection>> { + private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class); + private static final double SCORE_WEIGHT = 2.5; + + @Override + public PCollection> apply(PCollection> userScores) { + + // Get the sum of scores for each user. + PCollection> sumScores = userScores + .apply("UserSum", Sum.integersPerKey()); + + // Extract the score from each element, and use it to find the global mean. + final PCollectionView globalMeanScore = sumScores.apply(Values.create()) + .apply(Mean.globally().asSingletonView()); + + // Filter the user sums using the global mean. + PCollection> filtered = sumScores + .apply(ParDo + .named("ProcessAndFilter") + // use the derived mean total score as a side input + .withSideInputs(globalMeanScore) + .of(new DoFn, KV>() { + private final Aggregator numSpammerUsers = + createAggregator("SpammerUsers", new Sum.SumLongFn()); + @Override + public void processElement(ProcessContext c) { + Integer score = c.element().getValue(); + Double gmc = c.sideInput(globalMeanScore); + if (score > (gmc * SCORE_WEIGHT)) { + LOG.info("user " + c.element().getKey() + " spammer score " + score + + " with mean " + gmc); + numSpammerUsers.addValue(1L); + c.output(c.element()); + } + } + })); + return filtered; + } + } + // [END DocInclude_AbuseDetect] + + /** + * Calculate and output an element's session duration. + */ + private static class UserSessionInfoFn extends DoFn, Integer> + implements RequiresWindowAccess { + + @Override + public void processElement(ProcessContext c) { + IntervalWindow w = (IntervalWindow) c.window(); + int duration = new Duration( + w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes(); + c.output(duration); + } + } + + + /** + * Options supported by {@link GameStats}. + */ + static interface Options extends LeaderBoard.Options { + @Description("Numeric value of fixed window duration for user analysis, in minutes") + @Default.Integer(60) + Integer getFixedWindowDuration(); + void setFixedWindowDuration(Integer value); + + @Description("Numeric value of gap between user sessions, in minutes") + @Default.Integer(5) + Integer getSessionGap(); + void setSessionGap(Integer value); + + @Description("Numeric value of fixed window for finding mean of user session duration, " + + "in minutes") + @Default.Integer(30) + Integer getUserActivityWindowDuration(); + void setUserActivityWindowDuration(Integer value); + + @Description("Prefix used for the BigQuery table names") + @Default.String("game_stats") + String getTablePrefix(); + void setTablePrefix(String value); + } + + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write information about team score sums. + */ + protected static Map>> + configureWindowedWrite() { + Map>> tableConfigure = + new HashMap>>(); + tableConfigure.put("team", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> c.element().getKey())); + tableConfigure.put("total_score", + new WriteWindowedToBigQuery.FieldInfo>("INTEGER", + c -> c.element().getValue())); + tableConfigure.put("window_start", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + tableConfigure.put("processing_time", + new WriteWindowedToBigQuery.FieldInfo>( + "STRING", c -> fmt.print(Instant.now()))); + return tableConfigure; + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write information about mean user session time. + */ + protected static Map> + configureSessionWindowWrite() { + + Map> tableConfigure = + new HashMap>(); + tableConfigure.put("window_start", + new WriteWindowedToBigQuery.FieldInfo("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + tableConfigure.put("mean_duration", + new WriteWindowedToBigQuery.FieldInfo("FLOAT", c -> c.element())); + return tableConfigure; + } + + + + public static void main(String[] args) throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // Enforce that this pipeline is always run in streaming mode. + options.setStreaming(true); + // Allow the pipeline to be cancelled automatically. + options.setRunner(DataflowPipelineRunner.class); + DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + Pipeline pipeline = Pipeline.create(options); + + // Read Events from Pub/Sub using custom timestamps + PCollection rawEvents = pipeline + .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); + + // Extract username/score pairs from the event stream + PCollection> userEvents = + rawEvents.apply("ExtractUserScore", + MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) + .withOutputType(new TypeDescriptor>() {})); + + // Calculate the total score per user over fixed windows, and + // cumulative updates for late data. + final PCollectionView> spammersView = userEvents + .apply(Window.named("FixedWindowsUser") + .>into(FixedWindows.of( + Duration.standardMinutes(options.getFixedWindowDuration()))) + ) + + // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate. + // These might be robots/spammers. + .apply("CalculateSpammyUsers", new CalculateSpammyUsers()) + // Derive a view from the collection of spammer users. It will be used as a side input + // in calculating the team score sums, below. + .apply("CreateSpammersView", View.asMap()); + + // [START DocInclude_FilterAndCalc] + // Calculate the total score per team over fixed windows, + // and emit cumulative updates for late data. Uses the side input derived above-- the set of + // suspected robots-- to filter out scores from those users from the sum. + // Write the results to BigQuery. + rawEvents + .apply(Window.named("WindowIntoFixedWindows") + .into(FixedWindows.of( + Duration.standardMinutes(options.getFixedWindowDuration()))) + ) + // Filter out the detected spammer users, using the side input derived above. + .apply(ParDo.named("FilterOutSpammers") + .withSideInputs(spammersView) + .of(new DoFn() { + @Override + public void processElement(ProcessContext c) { + // If the user is not in the spammers Map, output the data element. + if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) { + c.output(c.element()); + } + } + })) + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + // [END DocInclude_FilterAndCalc] + // Write the result to BigQuery + .apply("WriteTeamSums", + new WriteWindowedToBigQuery>( + options.getTablePrefix() + "_team", configureWindowedWrite())); + + + // [START DocInclude_SessionCalc] + // Detect user sessions-- that is, a burst of activity separated by a gap from further + // activity. Find and record the mean session lengths. + // This information could help the game designers track the changing user engagement + // as their set of games changes. + userEvents + .apply(Window.named("WindowIntoSessions") + .>into( + Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) + // For this use, we care only about the existence of the session, not any particular + // information aggregated over it, so the following is an efficient way to do that. + .apply(Combine.perKey(x -> 0)) + // Get the duration per session. + .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn())) + // [END DocInclude_SessionCalc] + // [START DocInclude_Rewindow] + // Re-window to process groups of session sums according to when the sessions complete. + .apply(Window.named("WindowToExtractSessionMean") + .into( + FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) + // Find the mean session duration in each window. + .apply(Mean.globally().withoutDefaults()) + // Write this info to a BigQuery table. + .apply("WriteAvgSessionLength", + new WriteWindowedToBigQuery( + options.getTablePrefix() + "_sessions", configureSessionWindowWrite())); + // [END DocInclude_Rewindow] + + + // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the + // command line. + PipelineResult result = pipeline.run(); + dataflowUtils.waitToFinish(result); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java new file mode 100644 index 0000000..481b9df --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java @@ -0,0 +1,193 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.Filter; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.WithTimestamps; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; + +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * This class is the second in a series of four pipelines that tell a story in a 'gaming' + * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore}, + * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}. + * + *

This pipeline processes data collected from gaming events in batch, building on {@link + * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window, + * optionally allowing specification of two timestamps before and after which data is filtered out. + * This allows a model where late data collected after the intended analysis window can be included, + * and any late-arriving data prior to the beginning of the analysis window can be removed as well. + * By using windowing and adding element timestamps, we can do finer-grained analysis than with the + * {@link UserScore} pipeline. However, our batch processing is high-latency, in that we don't get + * results from plays at the beginning of the batch's time period until the batch is processed. + * + *

To execute this pipeline using the Dataflow service, specify the pipeline configuration + * like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. + * + *

Optionally include {@code --input} to specify the batch input file path. + * To indicate a time after which the data should be filtered out, include the + * {@code --stopMin} arg. E.g., {@code --stopMin=2015-10-18-23-59} indicates that any data + * timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis. + * To indicate a time before which data should be filtered out, include the {@code --startMin} arg. + * If you're using the default input specified in {@link UserScore}, + * "gs://dataflow-samples/game/gaming_data*.csv", then + * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values. + */ +public class HourlyTeamScore extends UserScore { + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + private static DateTimeFormatter minFmt = + DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + + + /** + * Options supported by {@link HourlyTeamScore}. + */ + static interface Options extends UserScore.Options { + + @Description("Numeric value of fixed window duration, in minutes") + @Default.Integer(60) + Integer getWindowDuration(); + void setWindowDuration(Integer value); + + @Description("String representation of the first minute after which to generate results," + + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST." + + "Any input data timestamped prior to that minute won't be included in the sums.") + @Default.String("1970-01-01-00-00") + String getStartMin(); + void setStartMin(String value); + + @Description("String representation of the first minute for which to not generate results," + + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST." + + "Any input data timestamped after that minute won't be included in the sums.") + @Default.String("2100-01-01-00-00") + String getStopMin(); + void setStopMin(String value); + + @Description("The BigQuery table name. Should not already exist.") + @Default.String("hourly_team_score") + String getTableName(); + void setTableName(String value); + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and + * includes information about window start time. + */ + protected static Map>> + configureWindowedTableWrite() { + Map>> tableConfig = + new HashMap>>(); + tableConfig.put("team", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> c.element().getKey())); + tableConfig.put("total_score", + new WriteWindowedToBigQuery.FieldInfo>("INTEGER", + c -> c.element().getValue())); + tableConfig.put("window_start", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + return tableConfig; + } + + + /** + * Run a batch pipeline to do windowed analysis of the data. + */ + // [START DocInclude_HTSMain] + public static void main(String[] args) throws Exception { + // Begin constructing a pipeline configured by commandline flags. + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = Pipeline.create(options); + + final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin())); + final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin())); + + // Read 'gaming' events from a text file. + pipeline.apply(TextIO.Read.from(options.getInput())) + // Parse the incoming data. + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) + + // Filter out data before and after the given times so that it is not included + // in the calculations. As we collect data in batches (say, by day), the batch for the day + // that we want to analyze could potentially include some late-arriving data from the previous + // day. If so, we want to weed it out. Similarly, if we include data from the following day + // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events + // that fall after the time period we want to analyze. + // [START DocInclude_HTSFilters] + .apply("FilterStartTime", Filter.byPredicate( + (GameActionInfo gInfo) + -> gInfo.getTimestamp() > startMinTimestamp.getMillis())) + .apply("FilterEndTime", Filter.byPredicate( + (GameActionInfo gInfo) + -> gInfo.getTimestamp() < stopMinTimestamp.getMillis())) + // [END DocInclude_HTSFilters] + + // [START DocInclude_HTSAddTsAndWindow] + // Add an element timestamp based on the event log, and apply fixed windowing. + .apply("AddEventTimestamps", + WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp()))) + .apply(Window.named("FixedWindowsTeam") + .into(FixedWindows.of( + Duration.standardMinutes(options.getWindowDuration())))) + // [END DocInclude_HTSAddTsAndWindow] + + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + .apply("WriteTeamScoreSums", + new WriteWindowedToBigQuery>(options.getTableName(), + configureWindowedTableWrite())); + + + pipeline.run(); + } + // [END DocInclude_HTSMain] + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java new file mode 100644 index 0000000..4185376 --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java @@ -0,0 +1,237 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; +import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; +import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery; +import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.io.PubsubIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * This class is the third in a series of four pipelines that tell a story in a 'gaming' domain, + * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded + * data using fixed windows; use of custom timestamps and event-time processing; generation of + * early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late- + * arriving data. + * + *

This pipeline processes an unbounded stream of 'game events'. The calculation of the team + * scores uses fixed windowing based on event time (the time of the game play event), not + * processing time (the time that an event is processed by the pipeline). The pipeline calculates + * the sum of scores per team, for each window. By default, the team scores are calculated using + * one-hour windows. + * + *

In contrast-- to demo another windowing option-- the user scores are calculated using a + * global window, which periodically (every ten minutes) emits cumulative user score sums. + * + *

In contrast to the previous pipelines in the series, which used static, finite input data, + * here we're using an unbounded data source, which lets us provide speculative results, and allows + * handling of late data, at much lower latency. We can use the early/speculative results to keep a + * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct + * results, e.g. for 'team prizes'. We're now outputing window results as they're + * calculated, giving us much lower latency than with the previous batch examples. + * + *

Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector + * documentation provides more detail on how to do this. + * + *

To execute this pipeline using the Dataflow service, specify the pipeline configuration + * like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. + * The PubSub topic you specify should be the same topic to which the Injector is publishing. + */ +public class LeaderBoard extends HourlyTeamScore { + + private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + static final Duration FIVE_MINUTES = Duration.standardMinutes(5); + static final Duration TEN_MINUTES = Duration.standardMinutes(10); + + + /** + * Options supported by {@link LeaderBoard}. + */ + static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions { + + @Description("Pub/Sub topic to read from") + @Validation.Required + String getTopic(); + void setTopic(String value); + + @Description("Numeric value of fixed window duration for team analysis, in minutes") + @Default.Integer(60) + Integer getTeamWindowDuration(); + void setTeamWindowDuration(Integer value); + + @Description("Numeric value of allowed data lateness, in minutes") + @Default.Integer(120) + Integer getAllowedLateness(); + void setAllowedLateness(Integer value); + + @Description("Prefix used for the BigQuery table names") + @Default.String("leaderboard") + String getTableName(); + void setTableName(String value); + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write team score sums and includes event timing information. + */ + protected static Map>> + configureWindowedTableWrite() { + + Map>> tableConfigure = + new HashMap>>(); + tableConfigure.put("team", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> c.element().getKey())); + tableConfigure.put("total_score", + new WriteWindowedToBigQuery.FieldInfo>("INTEGER", + c -> c.element().getValue())); + tableConfigure.put("window_start", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + tableConfigure.put("processing_time", + new WriteWindowedToBigQuery.FieldInfo>( + "STRING", c -> fmt.print(Instant.now()))); + tableConfigure.put("timing", + new WriteWindowedToBigQuery.FieldInfo>( + "STRING", c -> c.pane().getTiming().toString())); + return tableConfigure; + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write user score sums. + */ + protected static Map>> + configureGlobalWindowBigQueryWrite() { + + Map>> tableConfigure = + configureBigQueryWrite(); + tableConfigure.put("processing_time", + new WriteToBigQuery.FieldInfo>( + "STRING", c -> fmt.print(Instant.now()))); + return tableConfigure; + } + + + public static void main(String[] args) throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // Enforce that this pipeline is always run in streaming mode. + options.setStreaming(true); + // For example purposes, allow the pipeline to be easily cancelled instead of running + // continuously. + options.setRunner(DataflowPipelineRunner.class); + DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + Pipeline pipeline = Pipeline.create(options); + + // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub + // data elements, and parse the data. + PCollection gameEvents = pipeline + .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); + + // [START DocInclude_WindowAndTrigger] + // Extract team/score pairs from the event stream, using hour-long windows by default. + gameEvents + .apply(Window.named("LeaderboardTeamFixedWindows") + .into(FixedWindows.of( + Duration.standardMinutes(options.getTeamWindowDuration()))) + // We will get early (speculative) results as well as cumulative + // processing of late data. + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(FIVE_MINUTES)) + .withLateFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(TEN_MINUTES))) + .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())) + .accumulatingFiredPanes()) + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + // Write the results to BigQuery. + .apply("WriteTeamScoreSums", + new WriteWindowedToBigQuery>( + options.getTableName() + "_team", configureWindowedTableWrite())); + // [END DocInclude_WindowAndTrigger] + + // [START DocInclude_ProcTimeTrigger] + // Extract user/score pairs from the event stream using processing time, via global windowing. + // Get periodic updates on all users' running scores. + gameEvents + .apply(Window.named("LeaderboardUserGlobalWindow") + .into(new GlobalWindows()) + // Get periodic results every ten minutes. + .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(TEN_MINUTES))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))) + // Extract and sum username/score pairs from the event data. + .apply("ExtractUserScore", new ExtractAndSumScore("user")) + // Write the results to BigQuery. + .apply("WriteUserScoreSums", + new WriteToBigQuery>( + options.getTableName() + "_user", configureGlobalWindowBigQueryWrite())); + // [END DocInclude_ProcTimeTrigger] + + // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the + // command line. + PipelineResult result = pipeline.run(); + dataflowUtils.waitToFinish(result); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md new file mode 100644 index 0000000..79b55ce --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md @@ -0,0 +1,113 @@ + +# 'Gaming' examples + + +This directory holds a series of example Dataflow pipelines in a simple 'mobile +gaming' domain. They all require Java 8. Each pipeline successively introduces +new concepts, and gives some examples of using Java 8 syntax in constructing +Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts +that are used apply equally well in Java 7. + +In the gaming scenario, many users play, as members of different teams, over +the course of a day, and their actions are logged for processing. Some of the +logged game events may be late-arriving, if users play on mobile devices and go +transiently offline for a period. + +The scenario includes not only "regular" users, but "robot users", which have a +higher click rate than the regular users, and may move from team to team. + +The first two pipelines in the series use pre-generated batch data samples. The +second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/) +topic input. For these examples, you will also need to run the +`injector.Injector` program, which generates and publishes the gaming data to +PubSub. The javadocs for each pipeline have more detailed information on how to +run that pipeline. + +All of these pipelines write their results to BigQuery table(s). + + +## The pipelines in the 'gaming' series + +### UserScore + +The first pipeline in the series is `UserScore`. This pipeline does batch +processing of data collected from gaming events. It calculates the sum of +scores per user, over an entire batch of gaming data (collected, say, for each +day). The batch processing will not include any late data that arrives after +the day's cutoff point. + +### HourlyTeamScore + +The next pipeline in the series is `HourlyTeamScore`. This pipeline also +processes data collected from gaming events in batch. It builds on `UserScore`, +but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by +default an hour in duration. It calculates the sum of scores per team, for each +window, optionally allowing specification of two timestamps before and after +which data is filtered out. This allows a model where late data collected after +the intended analysis window can be included in the analysis, and any late- +arriving data prior to the beginning of the analysis window can be removed as +well. + +By using windowing and adding element timestamps, we can do finer-grained +analysis than with the `UserScore` pipeline — we're now tracking scores for +each hour rather than over the course of a whole day. However, our batch +processing is high-latency, in that we don't get results from plays at the +beginning of the batch's time period until the complete batch is processed. + +### LeaderBoard + +The third pipeline in the series is `LeaderBoard`. This pipeline processes an +unbounded stream of 'game events' from a PubSub topic. The calculation of the +team scores uses fixed windowing based on event time (the time of the game play +event), not processing time (the time that an event is processed by the +pipeline). The pipeline calculates the sum of scores per team, for each window. +By default, the team scores are calculated using one-hour windows. + +In contrast — to demo another windowing option — the user scores are calculated +using a global window, which periodically (every ten minutes) emits cumulative +user score sums. + +In contrast to the previous pipelines in the series, which used static, finite +input data, here we're using an unbounded data source, which lets us provide +_speculative_ results, and allows handling of late data, at much lower latency. +E.g., we could use the early/speculative results to keep a 'leaderboard' +updated in near-realtime. Our handling of late data lets us generate correct +results, e.g. for 'team prizes'. We're now outputing window results as they're +calculated, giving us much lower latency than with the previous batch examples. + +### GameStats + +The fourth pipeline in the series is `GameStats`. This pipeline builds +on the `LeaderBoard` functionality — supporting output of speculative and late +data — and adds some "business intelligence" analysis: identifying abuse +detection. The pipeline derives the Mean user score sum for a window, and uses +that information to identify likely spammers/robots. (The injector is designed +so that the "robots" have a higher click rate than the "real" users). The robot +users are then filtered out when calculating the team scores. + +Additionally, user sessions are tracked: that is, we find bursts of user +activity using session windows. Then, the mean session duration information is +recorded in the context of subsequent fixed windowing. (This could be used to +tell us what games are giving us greater user retention). + +### Running the PubSub Injector + +The `LeaderBoard` and `GameStats` example pipelines read unbounded data +from a PubSub topic. + +Use the `injector.Injector` program to generate this data and publish to a +PubSub topic. See the `Injector`javadocs for more information on how to run the +injector. Set up the injector before you start one of these pipelines. Then, +when you start the pipeline, pass as an argument the name of that PubSub topic. +See the pipeline javadocs for the details. + +## Viewing the results in BigQuery + +All of the pipelines write their results to BigQuery. `UserScore` and +`HourlyTeamScore` each write one table, and `LeaderBoard` and +`GameStats` each write two. The pipelines have default table names that +you can override when you start up the pipeline if those tables already exist. + +Depending on the windowing intervals defined in a given pipeline, you may have +to wait for a while (more than an hour) before you start to see results written +to the BigQuery tables. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java new file mode 100644 index 0000000..de06ce3 --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.AvroCoder; +import com.google.cloud.dataflow.sdk.coders.DefaultCoder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.apache.avro.reflect.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class is the first in a series of four pipelines that tell a story in a 'gaming' domain. + * Concepts: batch processing; reading input from Google Cloud Storage and writing output to + * BigQuery; using standalone DoFns; use of the sum by key transform; examples of + * Java 8 lambda syntax. + * + *

In this gaming scenario, many users play, as members of different teams, over the course of a + * day, and their actions are logged for processing. Some of the logged game events may be late- + * arriving, if users play on mobile devices and go transiently offline for a period. + * + *

This pipeline does batch processing of data collected from gaming events. It calculates the + * sum of scores per user, over an entire batch of gaming data (collected, say, for each day). The + * batch processing will not include any late data that arrives after the day's cutoff point. + * + *

To execute this pipeline using the Dataflow service and static example input data, specify + * the pipeline configuration like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. + * + *

Optionally include the --input argument to specify a batch input file. + * See the --input default value for example batch data file, or use {@link injector.Injector} to + * generate your own batch data. + */ +public class UserScore { + + /** + * Class to hold info about a game event. + */ + @DefaultCoder(AvroCoder.class) + static class GameActionInfo { + @Nullable String user; + @Nullable String team; + @Nullable Integer score; + @Nullable Long timestamp; + + public GameActionInfo() {} + + public GameActionInfo(String user, String team, Integer score, Long timestamp) { + this.user = user; + this.team = team; + this.score = score; + this.timestamp = timestamp; + } + + public String getUser() { + return this.user; + } + public String getTeam() { + return this.team; + } + public Integer getScore() { + return this.score; + } + public String getKey(String keyname) { + if (keyname.equals("team")) { + return this.team; + } else { // return username as default + return this.user; + } + } + public Long getTimestamp() { + return this.timestamp; + } + } + + + /** + * Parses the raw game event info into GameActionInfo objects. Each event line has the following + * format: username,teamname,score,timestamp_in_ms,readable_time + * e.g.: + * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 + * The human-readable time string is not used here. + */ + static class ParseEventFn extends DoFn { + + // Log and count parse errors. + private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class); + private final Aggregator numParseErrors = + createAggregator("ParseErrors", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + String[] components = c.element().split(","); + try { + String user = components[0].trim(); + String team = components[1].trim(); + Integer score = Integer.parseInt(components[2].trim()); + Long timestamp = Long.parseLong(components[3].trim()); + GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp); + c.output(gInfo); + } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { + numParseErrors.addValue(1L); + LOG.info("Parse error on " + c.element() + ", " + e.getMessage()); + } + } + } + + /** + * A transform to extract key/score information from GameActionInfo, and sum the scores. The + * constructor arg determines whether 'team' or 'user' info is extracted. + */ + // [START DocInclude_USExtractXform] + public static class ExtractAndSumScore + extends PTransform, PCollection>> { + + private final String field; + + ExtractAndSumScore(String field) { + this.field = field; + } + + @Override + public PCollection> apply( + PCollection gameInfo) { + + return gameInfo + .apply(MapElements + .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) + .withOutputType(new TypeDescriptor>() {})) + .apply(Sum.integersPerKey()); + } + } + // [END DocInclude_USExtractXform] + + + /** + * Options supported by {@link UserScore}. + */ + public static interface Options extends PipelineOptions { + + @Description("Path to the data file(s) containing game data.") + // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent + // day's worth (roughly) of data. + @Default.String("gs://dataflow-samples/game/gaming_data*.csv") + String getInput(); + void setInput(String value); + + @Description("BigQuery Dataset to write tables to. Must already exist.") + @Validation.Required + String getDataset(); + void setDataset(String value); + + @Description("The BigQuery table name. Should not already exist.") + @Default.String("user_score") + String getTableName(); + void setTableName(String value); + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is passed to the {@link WriteToBigQuery} constructor to write user score sums. + */ + protected static Map>> + configureBigQueryWrite() { + Map>> tableConfigure = + new HashMap>>(); + tableConfigure.put("user", + new WriteToBigQuery.FieldInfo>("STRING", c -> c.element().getKey())); + tableConfigure.put("total_score", + new WriteToBigQuery.FieldInfo>("INTEGER", c -> c.element().getValue())); + return tableConfigure; + } + + + /** + * Run a batch pipeline. + */ + // [START DocInclude_USMain] + public static void main(String[] args) throws Exception { + // Begin constructing a pipeline configured by commandline flags. + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = Pipeline.create(options); + + // Read events from a text file and parse them. + pipeline.apply(TextIO.Read.from(options.getInput())) + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) + // Extract and sum username/score pairs from the event data. + .apply("ExtractUserScore", new ExtractAndSumScore("user")) + .apply("WriteUserScoreSums", + new WriteToBigQuery>(options.getTableName(), + configureBigQueryWrite())); + + // Run the batch pipeline. + pipeline.run(); + } + // [END DocInclude_USMain] + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java new file mode 100644 index 0000000..1691c54 --- /dev/null +++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java @@ -0,0 +1,415 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.examples.complete.game.injector; + +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.common.collect.ImmutableMap; + +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.TimeZone; + + +/** + * This is a generator that simulates usage data from a mobile game, and either publishes the data + * to a pubsub topic or writes it to a file. + * + *

The general model used by the generator is the following. There is a set of teams with team + * members. Each member is scoring points for their team. After some period, a team will dissolve + * and a new one will be created in its place. There is also a set of 'Robots', or spammer users. + * They hop from team to team. The robots are set to have a higher 'click rate' (generate more + * events) than the regular team members. + * + *

Each generated line of data has the following form: + * username,teamname,score,timestamp_in_ms,readable_time + * e.g.: + * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 + * + *

The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if + * specified. It takes the following arguments: + * {@code Injector project-name (topic-name|none) (filename|none)}. + * + *

To run the Injector in the mode where it publishes to PubSub, you will need to authenticate + * locally using project-based service account credentials to avoid running over PubSub + * quota. + * See https://developers.google.com/identity/protocols/application-default-credentials + * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS + * environment variable to point to your downloaded service account credentials before starting the + * program, e.g.: + * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}. + * If you do not do this, then your injector will only run for a few minutes on your + * 'user account' credentials before you will start to see quota error messages like: + * "Request throttled due to user QPS limit being reached", and see this exception: + * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests". + * Once you've set up your credentials, run the Injector like this": + *

{@code
+ * Injector   none
+ * }
+ * 
+ * The pubsub topic will be created if it does not exist. + * + *

To run the injector in write-to-file-mode, set the topic name to "none" and specify the + * filename: + *

{@code
+ * Injector  none 
+ * }
+ * 
+ */ +class Injector { + private static Pubsub pubsub; + private static Random random = new Random(); + private static String topic; + private static String project; + private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; + + // QPS ranges from 800 to 1000. + private static final int MIN_QPS = 800; + private static final int QPS_RANGE = 200; + // How long to sleep, in ms, between creation of the threads that make API requests to PubSub. + private static final int THREAD_SLEEP_MS = 500; + + // Lists used to generate random team names. + private static final ArrayList COLORS = + new ArrayList(Arrays.asList( + "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber", + "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen", + "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana", + "Beige", "Bisque", "BarnRed", "BattleshipGrey")); + + private static final ArrayList ANIMALS = + new ArrayList(Arrays.asList( + "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu", + "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus", + "Bandicoot", "Cockatoo", "Antechinus")); + + // The list of live teams. + private static ArrayList liveTeams = new ArrayList(); + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + + + // The total number of robots in the system. + private static final int NUM_ROBOTS = 20; + // Determines the chance that a team will have a robot team member. + private static final int ROBOT_PROBABILITY = 3; + private static final int NUM_LIVE_TEAMS = 15; + private static final int BASE_MEMBERS_PER_TEAM = 5; + private static final int MEMBERS_PER_TEAM = 15; + private static final int MAX_SCORE = 20; + private static final int LATE_DATA_RATE = 5 * 60 * 2; // Every 10 minutes + private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000; // 5-10 minute delay + private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000; + + // The minimum time a 'team' can live. + private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20; + private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20; + + + /** + * A class for holding team info: the name of the team, when it started, + * and the current team members. Teams may but need not include one robot team member. + */ + private static class TeamInfo { + String teamName; + long startTimeInMillis; + int expirationPeriod; + // The team might but need not include 1 robot. Will be non-null if so. + String robot; + int numMembers; + + private TeamInfo(String teamName, long startTimeInMillis, String robot) { + this.teamName = teamName; + this.startTimeInMillis = startTimeInMillis; + // How long until this team is dissolved. + this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) + + BASE_TEAM_EXPIRATION_TIME_IN_MINS; + this.robot = robot; + // Determine the number of team members. + numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM; + } + + String getTeamName() { + return teamName; + } + String getRobot() { + return robot; + } + + long getStartTimeInMillis() { + return startTimeInMillis; + } + long getEndTimeInMillis() { + return startTimeInMillis + (expirationPeriod * 60 * 1000); + } + String getRandomUser() { + int userNum = random.nextInt(numMembers); + return "user" + userNum + "_" + teamName; + } + + int numMembers() { + return numMembers; + } + + @Override + public String toString() { + return "(" + teamName + ", num members: " + numMembers() + ", starting at: " + + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")"; + } + } + + /** Utility to grab a random element from an array of Strings. */ + private static String randomElement(ArrayList list) { + int index = random.nextInt(list.size()); + return list.get(index); + } + + /** + * Get and return a random team. If the selected team is too old w.r.t its expiration, remove + * it, replacing it with a new team. + */ + private static TeamInfo randomTeam(ArrayList list) { + int index = random.nextInt(list.size()); + TeamInfo team = list.get(index); + // If the selected team is expired, remove it and return a new team. + long currTime = System.currentTimeMillis(); + if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) { + System.out.println("\nteam " + team + " is too old; replacing."); + System.out.println("start time: " + team.getStartTimeInMillis() + + ", end time: " + team.getEndTimeInMillis() + + ", current time:" + currTime); + removeTeam(index); + // Add a new team in its stead. + return (addLiveTeam()); + } else { + return team; + } + } + + /** + * Create and add a team. Possibly add a robot to the team. + */ + private static synchronized TeamInfo addLiveTeam() { + String teamName = randomElement(COLORS) + randomElement(ANIMALS); + String robot = null; + // Decide if we want to add a robot to the team. + if (random.nextInt(ROBOT_PROBABILITY) == 0) { + robot = "Robot-" + random.nextInt(NUM_ROBOTS); + } + // Create the new team. + TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot); + liveTeams.add(newTeam); + System.out.println("[+" + newTeam + "]"); + return newTeam; + } + + /** + * Remove a specific team. + */ + private static synchronized void removeTeam(int teamIndex) { + TeamInfo removedTeam = liveTeams.remove(teamIndex); + System.out.println("[-" + removedTeam + "]"); + } + + /** Generate a user gaming event. */ + private static String generateEvent(Long currTime, int delayInMillis) { + TeamInfo team = randomTeam(liveTeams); + String teamName = team.getTeamName(); + String user; + final int parseErrorRate = 900000; + + String robot = team.getRobot(); + // If the team has an associated robot team member... + if (robot != null) { + // Then use that robot for the message with some probability. + // Set this probability to higher than that used to select any of the 'regular' team + // members, so that if there is a robot on the team, it has a higher click rate. + if (random.nextInt(team.numMembers() / 2) == 0) { + user = robot; + } else { + user = team.getRandomUser(); + } + } else { // No robot. + user = team.getRandomUser(); + } + String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE); + // Randomly introduce occasional parse errors. You can see a custom counter tracking the number + // of such errors in the Dataflow Monitoring UI, as the example pipeline runs. + if (random.nextInt(parseErrorRate) == 0) { + System.out.println("Introducing a parse error."); + event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR"; + } + return addTimeInfoToEvent(event, currTime, delayInMillis); + } + + /** + * Add time info to a generated gaming event. + */ + private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) { + String eventTimeString = + Long.toString((currTime - delayInMillis) / 1000 * 1000); + // Add a (redundant) 'human-readable' date string to make the data semantics more clear. + String dateString = fmt.print(currTime); + message = message + "," + eventTimeString + "," + dateString; + return message; + } + + /** + * Publish 'numMessages' arbitrary events from live users with the provided delay, to a + * PubSub topic. + */ + public static void publishData(int numMessages, int delayInMillis) + throws IOException { + List pubsubMessages = new ArrayList<>(); + + for (int i = 0; i < Math.max(1, numMessages); i++) { + Long currTime = System.currentTimeMillis(); + String message = generateEvent(currTime, delayInMillis); + PubsubMessage pubsubMessage = new PubsubMessage() + .encodeData(message.getBytes("UTF-8")); + pubsubMessage.setAttributes( + ImmutableMap.of(TIMESTAMP_ATTRIBUTE, + Long.toString((currTime - delayInMillis) / 1000 * 1000))); + if (delayInMillis != 0) { + System.out.println(pubsubMessage.getAttributes()); + System.out.println("late data for: " + message); + } + pubsubMessages.add(pubsubMessage); + } + + PublishRequest publishRequest = new PublishRequest(); + publishRequest.setMessages(pubsubMessages); + pubsub.projects().topics().publish(topic, publishRequest).execute(); + } + + /** + * Publish generated events to a file. + */ + public static void publishDataToFile(String fileName, int numMessages, int delayInMillis) + throws IOException { + PrintWriter out = new PrintWriter(new OutputStreamWriter( + new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8")); + + try { + for (int i = 0; i < Math.max(1, numMessages); i++) { + Long currTime = System.currentTimeMillis(); + String message = generateEvent(currTime, delayInMillis); + out.println(message); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (out != null) { + out.flush(); + out.close(); + } + } + } + + + public static void main(String[] args) throws IOException, InterruptedException { + if (args.length < 3) { + System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)"); + System.exit(1); + } + boolean writeToFile = false; + boolean writeToPubsub = true; + project = args[0]; + String topicName = args[1]; + String fileName = args[2]; + // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if + // specified; otherwise, it will try to write to a file. + if (topicName.equalsIgnoreCase("none")) { + writeToFile = true; + writeToPubsub = false; + } + if (writeToPubsub) { + // Create the PubSub client. + pubsub = InjectorUtils.getClient(); + // Create the PubSub topic as necessary. + topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName); + InjectorUtils.createTopic(pubsub, topic); + System.out.println("Injecting to topic: " + topic); + } else { + if (fileName.equalsIgnoreCase("none")) { + System.out.println("Filename not specified."); + System.exit(1); + } + System.out.println("Writing to file: " + fileName); + } + System.out.println("Starting Injector"); + + // Start off with some random live teams. + while (liveTeams.size() < NUM_LIVE_TEAMS) { + addLiveTeam(); + } + + // Publish messages at a rate determined by the QPS and Thread sleep settings. + for (int i = 0; true; i++) { + if (Thread.activeCount() > 10) { + System.err.println("I'm falling behind!"); + } + + // Decide if this should be a batch of late data. + final int numMessages; + final int delayInMillis; + if (i % LATE_DATA_RATE == 0) { + // Insert delayed data for one user (one message only) + delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS); + numMessages = 1; + System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")"); + } else { + System.out.print("."); + delayInMillis = 0; + numMessages = MIN_QPS + random.nextInt(QPS_RANGE); + } + + if (writeToFile) { // Won't use threading for the file write. + publishDataToFile(fileName, numMessages, delayInMillis); + } else { // Write to PubSub. + // Start a thread to inject some data. + new Thread(){ + @Override + public void run() { + try { + publishData(numMessages, delayInMillis); + } catch (IOException e) { + System.err.println(e); + } + } + }.start(); + } + + // Wait before creating another injector thread. + Thread.sleep(THREAD_SLEEP_MS); + } + } +}