beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [63/67] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:48:27 GMT
Directory reorganization

Move Java 8 examples from "java8examples/" into "examples/java8/".


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11bb9e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11bb9e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11bb9e0e

Branch: refs/heads/master
Commit: 11bb9e0e61f8b15ce81e5181baa5458bb715a059
Parents: 2eaa709
Author: Davor Bonaci <davor@google.com>
Authored: Wed Mar 23 17:16:47 2016 -0700
Committer: Davor Bonaci <davor@google.com>
Committed: Wed Mar 23 18:33:33 2016 -0700

----------------------------------------------------------------------
 examples/java8/pom.xml                          | 279 +++++++++++++
 .../examples/MinimalWordCountJava8.java         |  68 +++
 .../examples/complete/game/GameStats.java       | 339 +++++++++++++++
 .../examples/complete/game/HourlyTeamScore.java | 193 +++++++++
 .../examples/complete/game/LeaderBoard.java     | 237 +++++++++++
 .../dataflow/examples/complete/game/README.md   | 113 +++++
 .../examples/complete/game/UserScore.java       | 239 +++++++++++
 .../complete/game/injector/Injector.java        | 415 +++++++++++++++++++
 .../complete/game/injector/InjectorUtils.java   | 101 +++++
 .../injector/RetryHttpInitializerWrapper.java   | 126 ++++++
 .../complete/game/utils/WriteToBigQuery.java    | 134 ++++++
 .../game/utils/WriteWindowedToBigQuery.java     |  76 ++++
 .../examples/MinimalWordCountJava8Test.java     | 103 +++++
 .../examples/complete/game/GameStatsTest.java   |  76 ++++
 .../complete/game/HourlyTeamScoreTest.java      | 111 +++++
 .../examples/complete/game/UserScoreTest.java   | 154 +++++++
 java8examples/pom.xml                           | 279 -------------
 .../examples/MinimalWordCountJava8.java         |  68 ---
 .../examples/complete/game/GameStats.java       | 339 ---------------
 .../examples/complete/game/HourlyTeamScore.java | 193 ---------
 .../examples/complete/game/LeaderBoard.java     | 237 -----------
 .../dataflow/examples/complete/game/README.md   | 113 -----
 .../examples/complete/game/UserScore.java       | 239 -----------
 .../complete/game/injector/Injector.java        | 415 -------------------
 .../complete/game/injector/InjectorUtils.java   | 101 -----
 .../injector/RetryHttpInitializerWrapper.java   | 126 ------
 .../complete/game/utils/WriteToBigQuery.java    | 134 ------
 .../game/utils/WriteWindowedToBigQuery.java     |  76 ----
 .../examples/MinimalWordCountJava8Test.java     | 103 -----
 .../examples/complete/game/GameStatsTest.java   |  76 ----
 .../complete/game/HourlyTeamScoreTest.java      | 111 -----
 .../examples/complete/game/UserScoreTest.java   | 154 -------
 pom.xml                                         |   2 +-
 33 files changed, 2765 insertions(+), 2765 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
new file mode 100644
index 0000000..7d55c31
--- /dev/null
+++ b/examples/java8/pom.xml
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>java8examples-all</artifactId>
+  <name>Apache Beam :: Examples :: Java 8 All</name>
+  <description>Apache Beam Java SDK provides a simple, Java-based
+    interface for processing virtually any size data.
+    This artifact includes examples of the SDK from a Java 8
+    user.</description>
+
+  <packaging>jar</packaging>
+
+  <profiles>
+    <profile>
+      <id>DataflowPipelineTests</id>
+      <properties>
+        <runIntegrationTestOnService>true</runIntegrationTestOnService>
+        <testGroups>com.google.cloud.dataflow.sdk.testing.RunnableOnService</testGroups>
+        <testParallelValue>both</testParallelValue>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+          <testSource>1.8</testSource>
+          <testTarget>1.8</testTarget>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals><goal>analyze-only</goal></goals>
+            <configuration>
+              <failOnWarning>true</failOnWarning>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.12</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>6.6</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>../../checkstyle.xml</configLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+          <includeResources>false</includeResources>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>attach-test-sources</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-jar</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>default-test-jar</id>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-examples-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.10.19</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+      <version>${bigquery.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.oauth-client</groupId>
+      <artifactId>google-oauth-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-pubsub</artifactId>
+      <version>${pubsub.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
new file mode 100644
index 0000000..c115ea0
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import java.util.Arrays;
+
+/**
+ * An example that counts words in Shakespeare, using Java 8 language features.
+ *
+ * <p>See {@link MinimalWordCount} for a comprehensive explanation.
+ */
+public class MinimalWordCountJava8 {
+
+  public static void main(String[] args) {
+    DataflowPipelineOptions options = PipelineOptionsFactory.create()
+        .as(DataflowPipelineOptions.class);
+
+    options.setRunner(BlockingDataflowPipelineRunner.class);
+
+    // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
+    options.setProject("SET_YOUR_PROJECT_ID_HERE");
+
+    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
+    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
+         .withOutputType(new TypeDescriptor<String>() {}))
+     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+     .apply(Count.<String>perElement())
+     .apply(MapElements
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
+         .withOutputType(new TypeDescriptor<String>() {}))
+
+     // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
+     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java
new file mode 100644
index 0000000..7c67d10
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/GameStats.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.Mean;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the fourth in a series of four pipelines that tell a story in a 'gaming'
+ * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}.
+ * New concepts: session windows and finding session duration; use of both
+ * singleton and non-singleton side inputs.
+ *
+ * <p> This pipeline builds on the {@link LeaderBoard} functionality, and adds some "business
+ * intelligence" analysis: abuse detection and usage patterns. The pipeline derives the Mean user
+ * score sum for a window, and uses that information to identify likely spammers/robots. (The robots
+ * have a higher click rate than the human users). The 'robot' users are then filtered out when
+ * calculating the team scores.
+ *
+ * <p> Additionally, user sessions are tracked: that is, we find bursts of user activity using
+ * session windows. Then, the mean session duration information is recorded in the context of
+ * subsequent fixed windowing. (This could be used to tell us what games are giving us greater
+ * user retention).
+ *
+ * <p> Run {@code com.google.cloud.dataflow.examples.complete.game.injector.Injector} to generate
+ * pubsub data for this pipeline. The {@code Injector} documentation provides more detail.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist. The PubSub topic you specify should
+ * be the same topic to which the Injector is publishing.
+ */
+public class GameStats extends LeaderBoard {
+
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+  private static DateTimeFormatter fmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+  /**
+   * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs.
+   * We do this by finding the mean total score per user, then using that information as a side
+   * input to filter out all but those user scores that are > (mean * SCORE_WEIGHT)
+   */
+  // [START DocInclude_AbuseDetect]
+  public static class CalculateSpammyUsers
+      extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
+    private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
+    private static final double SCORE_WEIGHT = 2.5;
+
+    @Override
+    public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
+
+      // Get the sum of scores for each user.
+      PCollection<KV<String, Integer>> sumScores = userScores
+          .apply("UserSum", Sum.<String>integersPerKey());
+
+      // Extract the score from each element, and use it to find the global mean.
+      final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
+          .apply(Mean.<Integer>globally().asSingletonView());
+
+      // Filter the user sums using the global mean.
+      PCollection<KV<String, Integer>> filtered = sumScores
+          .apply(ParDo
+              .named("ProcessAndFilter")
+              // use the derived mean total score as a side input
+              .withSideInputs(globalMeanScore)
+              .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+                private final Aggregator<Long, Long> numSpammerUsers =
+                  createAggregator("SpammerUsers", new Sum.SumLongFn());
+                @Override
+                public void processElement(ProcessContext c) {
+                  Integer score = c.element().getValue();
+                  Double gmc = c.sideInput(globalMeanScore);
+                  if (score > (gmc * SCORE_WEIGHT)) {
+                    LOG.info("user " + c.element().getKey() + " spammer score " + score
+                        + " with mean " + gmc);
+                    numSpammerUsers.addValue(1L);
+                    c.output(c.element());
+                  }
+                }
+              }));
+      return filtered;
+    }
+  }
+  // [END DocInclude_AbuseDetect]
+
+  /**
+   * Calculate and output an element's session duration.
+   */
+  private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer>
+      implements RequiresWindowAccess {
+
+    @Override
+    public void processElement(ProcessContext c) {
+      IntervalWindow w = (IntervalWindow) c.window();
+      int duration = new Duration(
+          w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
+      c.output(duration);
+    }
+  }
+
+
+  /**
+   * Options supported by {@link GameStats}.
+   */
+  static interface Options extends LeaderBoard.Options {
+    @Description("Numeric value of fixed window duration for user analysis, in minutes")
+    @Default.Integer(60)
+    Integer getFixedWindowDuration();
+    void setFixedWindowDuration(Integer value);
+
+    @Description("Numeric value of gap between user sessions, in minutes")
+    @Default.Integer(5)
+    Integer getSessionGap();
+    void setSessionGap(Integer value);
+
+    @Description("Numeric value of fixed window for finding mean of user session duration, "
+        + "in minutes")
+    @Default.Integer(30)
+    Integer getUserActivityWindowDuration();
+    void setUserActivityWindowDuration(Integer value);
+
+    @Description("Prefix used for the BigQuery table names")
+    @Default.String("game_stats")
+    String getTablePrefix();
+    void setTablePrefix(String value);
+  }
+
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write information about team score sums.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureWindowedWrite() {
+    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put("team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+            c -> c.element().getKey()));
+    tableConfigure.put("total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+            c -> c.element().getValue()));
+    tableConfigure.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    tableConfigure.put("processing_time",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> fmt.print(Instant.now())));
+    return tableConfigure;
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write information about mean user session time.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
+      configureSessionWindowWrite() {
+
+    Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<Double>>();
+    tableConfigure.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<Double>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    tableConfigure.put("mean_duration",
+        new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", c -> c.element()));
+    return tableConfigure;
+  }
+
+
+
+  public static void main(String[] args) throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    // Enforce that this pipeline is always run in streaming mode.
+    options.setStreaming(true);
+    // Allow the pipeline to be cancelled automatically.
+    options.setRunner(DataflowPipelineRunner.class);
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    Pipeline pipeline = Pipeline.create(options);
+
+    // Read Events from Pub/Sub using custom timestamps
+    PCollection<GameActionInfo> rawEvents = pipeline
+        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+
+    // Extract username/score pairs from the event stream
+    PCollection<KV<String, Integer>> userEvents =
+        rawEvents.apply("ExtractUserScore",
+          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+            .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+    // Calculate the total score per user over fixed windows, and
+    // cumulative updates for late data.
+    final PCollectionView<Map<String, Integer>> spammersView = userEvents
+      .apply(Window.named("FixedWindowsUser")
+          .<KV<String, Integer>>into(FixedWindows.of(
+              Duration.standardMinutes(options.getFixedWindowDuration())))
+          )
+
+      // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
+      // These might be robots/spammers.
+      .apply("CalculateSpammyUsers", new CalculateSpammyUsers())
+      // Derive a view from the collection of spammer users. It will be used as a side input
+      // in calculating the team score sums, below.
+      .apply("CreateSpammersView", View.<String, Integer>asMap());
+
+    // [START DocInclude_FilterAndCalc]
+    // Calculate the total score per team over fixed windows,
+    // and emit cumulative updates for late data. Uses the side input derived above-- the set of
+    // suspected robots-- to filter out scores from those users from the sum.
+    // Write the results to BigQuery.
+    rawEvents
+      .apply(Window.named("WindowIntoFixedWindows")
+          .<GameActionInfo>into(FixedWindows.of(
+              Duration.standardMinutes(options.getFixedWindowDuration())))
+          )
+      // Filter out the detected spammer users, using the side input derived above.
+      .apply(ParDo.named("FilterOutSpammers")
+              .withSideInputs(spammersView)
+              .of(new DoFn<GameActionInfo, GameActionInfo>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  // If the user is not in the spammers Map, output the data element.
+                  if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
+                    c.output(c.element());
+                  }
+                }
+              }))
+      // Extract and sum teamname/score pairs from the event data.
+      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+      // [END DocInclude_FilterAndCalc]
+      // Write the result to BigQuery
+      .apply("WriteTeamSums",
+             new WriteWindowedToBigQuery<KV<String, Integer>>(
+                options.getTablePrefix() + "_team", configureWindowedWrite()));
+
+
+    // [START DocInclude_SessionCalc]
+    // Detect user sessions-- that is, a burst of activity separated by a gap from further
+    // activity. Find and record the mean session lengths.
+    // This information could help the game designers track the changing user engagement
+    // as their set of games changes.
+    userEvents
+      .apply(Window.named("WindowIntoSessions")
+            .<KV<String, Integer>>into(
+                  Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
+        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+      // For this use, we care only about the existence of the session, not any particular
+      // information aggregated over it, so the following is an efficient way to do that.
+      .apply(Combine.perKey(x -> 0))
+      // Get the duration per session.
+      .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
+      // [END DocInclude_SessionCalc]
+      // [START DocInclude_Rewindow]
+      // Re-window to process groups of session sums according to when the sessions complete.
+      .apply(Window.named("WindowToExtractSessionMean")
+            .<Integer>into(
+                FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
+      // Find the mean session duration in each window.
+      .apply(Mean.<Integer>globally().withoutDefaults())
+      // Write this info to a BigQuery table.
+      .apply("WriteAvgSessionLength",
+             new WriteWindowedToBigQuery<Double>(
+                options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));
+    // [END DocInclude_Rewindow]
+
+
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = pipeline.run();
+    dataflowUtils.waitToFinish(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
new file mode 100644
index 0000000..481b9df
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.WithTimestamps;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the second in a series of four pipelines that tell a story in a 'gaming'
+ * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
+ * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}.
+ *
+ * <p> This pipeline processes data collected from gaming events in batch, building on {@link
+ * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window,
+ * optionally allowing specification of two timestamps before and after which data is filtered out.
+ * This allows a model where late data collected after the intended analysis window can be included,
+ * and any late-arriving data prior to the beginning of the analysis window can be removed as well.
+ * By using windowing and adding element timestamps, we can do finer-grained analysis than with the
+ * {@link UserScore} pipeline. However, our batch processing is high-latency, in that we don't get
+ * results from plays at the beginning of the batch's time period until the batch is processed.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ *
+ * <p> Optionally include {@code --input} to specify the batch input file path.
+ * To indicate a time after which the data should be filtered out, include the
+ * {@code --stopMin} arg. E.g., {@code --stopMin=2015-10-18-23-59} indicates that any data
+ * timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis.
+ * To indicate a time before which data should be filtered out, include the {@code --startMin} arg.
+ * If you're using the default input specified in {@link UserScore},
+ * "gs://dataflow-samples/game/gaming_data*.csv", then
+ * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values.
+ */
+public class HourlyTeamScore extends UserScore {
+
+  private static DateTimeFormatter fmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+  private static DateTimeFormatter minFmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+
+  /**
+   * Options supported by {@link HourlyTeamScore}.
+   */
+  static interface Options extends UserScore.Options {
+
+    @Description("Numeric value of fixed window duration, in minutes")
+    @Default.Integer(60)
+    Integer getWindowDuration();
+    void setWindowDuration(Integer value);
+
+    @Description("String representation of the first minute after which to generate results,"
+        + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
+        + "Any input data timestamped prior to that minute won't be included in the sums.")
+    @Default.String("1970-01-01-00-00")
+    String getStartMin();
+    void setStartMin(String value);
+
+    @Description("String representation of the first minute for which to not generate results,"
+        + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
+        + "Any input data timestamped after that minute won't be included in the sums.")
+    @Default.String("2100-01-01-00-00")
+    String getStopMin();
+    void setStopMin(String value);
+
+    @Description("The BigQuery table name. Should not already exist.")
+    @Default.String("hourly_team_score")
+    String getTableName();
+    void setTableName(String value);
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and
+   * includes information about window start time.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureWindowedTableWrite() {
+    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfig.put("team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+            c -> c.element().getKey()));
+    tableConfig.put("total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+            c -> c.element().getValue()));
+    tableConfig.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    return tableConfig;
+  }
+
+
+  /**
+   * Run a batch pipeline to do windowed analysis of the data.
+   */
+  // [START DocInclude_HTSMain]
+  public static void main(String[] args) throws Exception {
+    // Begin constructing a pipeline configured by commandline flags.
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
+    final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
+
+    // Read 'gaming' events from a text file.
+    pipeline.apply(TextIO.Read.from(options.getInput()))
+      // Parse the incoming data.
+      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+
+      // Filter out data before and after the given times so that it is not included
+      // in the calculations. As we collect data in batches (say, by day), the batch for the day
+      // that we want to analyze could potentially include some late-arriving data from the previous
+      // day. If so, we want to weed it out. Similarly, if we include data from the following day
+      // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
+      // that fall after the time period we want to analyze.
+      // [START DocInclude_HTSFilters]
+      .apply("FilterStartTime", Filter.byPredicate(
+          (GameActionInfo gInfo)
+              -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
+      .apply("FilterEndTime", Filter.byPredicate(
+          (GameActionInfo gInfo)
+              -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
+      // [END DocInclude_HTSFilters]
+
+      // [START DocInclude_HTSAddTsAndWindow]
+      // Add an element timestamp based on the event log, and apply fixed windowing.
+      .apply("AddEventTimestamps",
+             WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
+      .apply(Window.named("FixedWindowsTeam")
+          .<GameActionInfo>into(FixedWindows.of(
+                Duration.standardMinutes(options.getWindowDuration()))))
+      // [END DocInclude_HTSAddTsAndWindow]
+
+      // Extract and sum teamname/score pairs from the event data.
+      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+      .apply("WriteTeamScoreSums",
+        new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
+            configureWindowedTableWrite()));
+
+
+    pipeline.run();
+  }
+  // [END DocInclude_HTSMain]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
new file mode 100644
index 0000000..4185376
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the third in a series of four pipelines that tell a story in a 'gaming' domain,
+ * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded
+ * data using fixed windows; use of custom timestamps and event-time processing; generation of
+ * early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late-
+ * arriving data.
+ *
+ * <p> This pipeline processes an unbounded stream of 'game events'. The calculation of the team
+ * scores uses fixed windowing based on event time (the time of the game play event), not
+ * processing time (the time that an event is processed by the pipeline). The pipeline calculates
+ * the sum of scores per team, for each window. By default, the team scores are calculated using
+ * one-hour windows.
+ *
+ * <p> In contrast-- to demo another windowing option-- the user scores are calculated using a
+ * global window, which periodically (every ten minutes) emits cumulative user score sums.
+ *
+ * <p> In contrast to the previous pipelines in the series, which used static, finite input data,
+ * here we're using an unbounded data source, which lets us provide speculative results, and allows
+ * handling of late data, at much lower latency. We can use the early/speculative results to keep a
+ * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct
+ * results, e.g. for 'team prizes'. We're now outputing window results as they're
+ * calculated, giving us much lower latency than with the previous batch examples.
+ *
+ * <p> Run {@link injector.Injector} to generate pubsub data for this pipeline.  The Injector
+ * documentation provides more detail on how to do this.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ * The PubSub topic you specify should be the same topic to which the Injector is publishing.
+ */
+public class LeaderBoard extends HourlyTeamScore {
+
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+  private static DateTimeFormatter fmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+  static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+  static final Duration TEN_MINUTES = Duration.standardMinutes(10);
+
+
+  /**
+   * Options supported by {@link LeaderBoard}.
+   */
+  static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions {
+
+    @Description("Pub/Sub topic to read from")
+    @Validation.Required
+    String getTopic();
+    void setTopic(String value);
+
+    @Description("Numeric value of fixed window duration for team analysis, in minutes")
+    @Default.Integer(60)
+    Integer getTeamWindowDuration();
+    void setTeamWindowDuration(Integer value);
+
+    @Description("Numeric value of allowed data lateness, in minutes")
+    @Default.Integer(120)
+    Integer getAllowedLateness();
+    void setAllowedLateness(Integer value);
+
+    @Description("Prefix used for the BigQuery table names")
+    @Default.String("leaderboard")
+    String getTableName();
+    void setTableName(String value);
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write team score sums and includes event timing information.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureWindowedTableWrite() {
+
+    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put("team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+            c -> c.element().getKey()));
+    tableConfigure.put("total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+            c -> c.element().getValue()));
+    tableConfigure.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    tableConfigure.put("processing_time",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> fmt.print(Instant.now())));
+    tableConfigure.put("timing",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> c.pane().getTiming().toString()));
+    return tableConfigure;
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write user score sums.
+   */
+  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureGlobalWindowBigQueryWrite() {
+
+    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        configureBigQueryWrite();
+    tableConfigure.put("processing_time",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> fmt.print(Instant.now())));
+    return tableConfigure;
+  }
+
+
+  public static void main(String[] args) throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    // Enforce that this pipeline is always run in streaming mode.
+    options.setStreaming(true);
+    // For example purposes, allow the pipeline to be easily cancelled instead of running
+    // continuously.
+    options.setRunner(DataflowPipelineRunner.class);
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    Pipeline pipeline = Pipeline.create(options);
+
+    // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
+    // data elements, and parse the data.
+    PCollection<GameActionInfo> gameEvents = pipeline
+        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+
+    // [START DocInclude_WindowAndTrigger]
+    // Extract team/score pairs from the event stream, using hour-long windows by default.
+    gameEvents
+        .apply(Window.named("LeaderboardTeamFixedWindows")
+          .<GameActionInfo>into(FixedWindows.of(
+              Duration.standardMinutes(options.getTeamWindowDuration())))
+          // We will get early (speculative) results as well as cumulative
+          // processing of late data.
+          .triggering(
+            AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(FIVE_MINUTES))
+            .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(TEN_MINUTES)))
+          .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
+          .accumulatingFiredPanes())
+        // Extract and sum teamname/score pairs from the event data.
+        .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+        // Write the results to BigQuery.
+        .apply("WriteTeamScoreSums",
+               new WriteWindowedToBigQuery<KV<String, Integer>>(
+                  options.getTableName() + "_team", configureWindowedTableWrite()));
+    // [END DocInclude_WindowAndTrigger]
+
+    // [START DocInclude_ProcTimeTrigger]
+    // Extract user/score pairs from the event stream using processing time, via global windowing.
+    // Get periodic updates on all users' running scores.
+    gameEvents
+        .apply(Window.named("LeaderboardUserGlobalWindow")
+          .<GameActionInfo>into(new GlobalWindows())
+          // Get periodic results every ten minutes.
+              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(TEN_MINUTES)))
+              .accumulatingFiredPanes()
+              .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())))
+        // Extract and sum username/score pairs from the event data.
+        .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+        // Write the results to BigQuery.
+        .apply("WriteUserScoreSums",
+               new WriteToBigQuery<KV<String, Integer>>(
+                  options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
+    // [END DocInclude_ProcTimeTrigger]
+
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = pipeline.run();
+    dataflowUtils.waitToFinish(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
new file mode 100644
index 0000000..79b55ce
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
@@ -0,0 +1,113 @@
+
+# 'Gaming' examples
+
+
+This directory holds a series of example Dataflow pipelines in a simple 'mobile
+gaming' domain. They all require Java 8.  Each pipeline successively introduces
+new concepts, and gives some examples of using Java 8 syntax in constructing
+Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts
+that are used apply equally well in Java 7.
+
+In the gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period.
+
+The scenario includes not only "regular" users, but "robot users", which have a
+higher click rate than the regular users, and may move from team to team.
+
+The first two pipelines in the series use pre-generated batch data samples. The
+second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/)
+topic input.  For these examples, you will also need to run the
+`injector.Injector` program, which generates and publishes the gaming data to
+PubSub. The javadocs for each pipeline have more detailed information on how to
+run that pipeline.
+
+All of these pipelines write their results to BigQuery table(s).
+
+
+## The pipelines in the 'gaming' series
+
+### UserScore
+
+The first pipeline in the series is `UserScore`. This pipeline does batch
+processing of data collected from gaming events. It calculates the sum of
+scores per user, over an entire batch of gaming data (collected, say, for each
+day). The batch processing will not include any late data that arrives after
+the day's cutoff point.
+
+### HourlyTeamScore
+
+The next pipeline in the series is `HourlyTeamScore`. This pipeline also
+processes data collected from gaming events in batch. It builds on `UserScore`,
+but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by
+default an hour in duration. It calculates the sum of scores per team, for each
+window, optionally allowing specification of two timestamps before and after
+which data is filtered out. This allows a model where late data collected after
+the intended analysis window can be included in the analysis, and any late-
+arriving data prior to the beginning of the analysis window can be removed as
+well.
+
+By using windowing and adding element timestamps, we can do finer-grained
+analysis than with the `UserScore` pipeline — we're now tracking scores for
+each hour rather than over the course of a whole day. However, our batch
+processing is high-latency, in that we don't get results from plays at the
+beginning of the batch's time period until the complete batch is processed.
+
+### LeaderBoard
+
+The third pipeline in the series is `LeaderBoard`. This pipeline processes an
+unbounded stream of 'game events' from a PubSub topic. The calculation of the
+team scores uses fixed windowing based on event time (the time of the game play
+event), not processing time (the time that an event is processed by the
+pipeline). The pipeline calculates the sum of scores per team, for each window.
+By default, the team scores are calculated using one-hour windows.
+
+In contrast — to demo another windowing option — the user scores are calculated
+using a global window, which periodically (every ten minutes) emits cumulative
+user score sums.
+
+In contrast to the previous pipelines in the series, which used static, finite
+input data, here we're using an unbounded data source, which lets us provide
+_speculative_ results, and allows handling of late data, at much lower latency.
+E.g., we could use the early/speculative results to keep a 'leaderboard'
+updated in near-realtime. Our handling of late data lets us generate correct
+results, e.g. for 'team prizes'. We're now outputing window results as they're
+calculated, giving us much lower latency than with the previous batch examples.
+
+### GameStats
+
+The fourth pipeline in the series is `GameStats`. This pipeline builds
+on the `LeaderBoard` functionality — supporting output of speculative and late
+data — and adds some "business intelligence" analysis: identifying abuse
+detection. The pipeline derives the Mean user score sum for a window, and uses
+that information to identify likely spammers/robots. (The injector is designed
+so that the "robots" have a higher click rate than the "real" users). The robot
+users are then filtered out when calculating the team scores.
+
+Additionally, user sessions are tracked: that is, we find bursts of user
+activity using session windows. Then, the mean session duration information is
+recorded in the context of subsequent fixed windowing. (This could be used to
+tell us what games are giving us greater user retention).
+
+### Running the PubSub Injector
+
+The `LeaderBoard` and `GameStats` example pipelines read unbounded data
+from a PubSub topic.
+
+Use the `injector.Injector` program to generate this data and publish to a
+PubSub topic. See the `Injector`javadocs for more information on how to run the
+injector. Set up the injector before you start one of these pipelines. Then,
+when you start the pipeline, pass as an argument the name of that PubSub topic.
+See the pipeline javadocs for the details.
+
+## Viewing the results in BigQuery
+
+All of the pipelines write their results to BigQuery.  `UserScore` and
+`HourlyTeamScore` each write one table, and `LeaderBoard` and
+`GameStats` each write two. The pipelines have default table names that
+you can override when you start up the pipeline if those tables already exist.
+
+Depending on the windowing intervals defined in a given pipeline, you may have
+to wait for a while (more than an hour) before you start to see results written
+to the BigQuery tables.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
new file mode 100644
index 0000000..de06ce3
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.apache.avro.reflect.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is the first in a series of four pipelines that tell a story in a 'gaming' domain.
+ * Concepts: batch processing; reading input from Google Cloud Storage and writing output to
+ * BigQuery; using standalone DoFns; use of the sum by key transform; examples of
+ * Java 8 lambda syntax.
+ *
+ * <p> In this gaming scenario, many users play, as members of different teams, over the course of a
+ * day, and their actions are logged for processing.  Some of the logged game events may be late-
+ * arriving, if users play on mobile devices and go transiently offline for a period.
+ *
+ * <p> This pipeline does batch processing of data collected from gaming events. It calculates the
+ * sum of scores per user, over an entire batch of gaming data (collected, say, for each day). The
+ * batch processing will not include any late data that arrives after the day's cutoff point.
+ *
+ * <p> To execute this pipeline using the Dataflow service and static example input data, specify
+ * the pipeline configuration like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ *
+ * <p> Optionally include the --input argument to specify a batch input file.
+ * See the --input default value for example batch data file, or use {@link injector.Injector} to
+ * generate your own batch data.
+  */
+public class UserScore {
+
+  /**
+   * Class to hold info about a game event.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class GameActionInfo {
+    @Nullable String user;
+    @Nullable String team;
+    @Nullable Integer score;
+    @Nullable Long timestamp;
+
+    public GameActionInfo() {}
+
+    public GameActionInfo(String user, String team, Integer score, Long timestamp) {
+      this.user = user;
+      this.team = team;
+      this.score = score;
+      this.timestamp = timestamp;
+    }
+
+    public String getUser() {
+      return this.user;
+    }
+    public String getTeam() {
+      return this.team;
+    }
+    public Integer getScore() {
+      return this.score;
+    }
+    public String getKey(String keyname) {
+      if (keyname.equals("team")) {
+        return this.team;
+      } else {  // return username as default
+        return this.user;
+      }
+    }
+    public Long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+
+  /**
+   * Parses the raw game event info into GameActionInfo objects. Each event line has the following
+   * format: username,teamname,score,timestamp_in_ms,readable_time
+   * e.g.:
+   * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+   * The human-readable time string is not used here.
+   */
+  static class ParseEventFn extends DoFn<String, GameActionInfo> {
+
+    // Log and count parse errors.
+    private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
+    private final Aggregator<Long, Long> numParseErrors =
+        createAggregator("ParseErrors", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String[] components = c.element().split(",");
+      try {
+        String user = components[0].trim();
+        String team = components[1].trim();
+        Integer score = Integer.parseInt(components[2].trim());
+        Long timestamp = Long.parseLong(components[3].trim());
+        GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
+        c.output(gInfo);
+      } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
+        numParseErrors.addValue(1L);
+        LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * A transform to extract key/score information from GameActionInfo, and sum the scores. The
+   * constructor arg determines whether 'team' or 'user' info is extracted.
+   */
+  // [START DocInclude_USExtractXform]
+  public static class ExtractAndSumScore
+      extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
+
+    private final String field;
+
+    ExtractAndSumScore(String field) {
+      this.field = field;
+    }
+
+    @Override
+    public PCollection<KV<String, Integer>> apply(
+        PCollection<GameActionInfo> gameInfo) {
+
+      return gameInfo
+        .apply(MapElements
+            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
+            .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
+        .apply(Sum.<String>integersPerKey());
+    }
+  }
+  // [END DocInclude_USExtractXform]
+
+
+  /**
+   * Options supported by {@link UserScore}.
+   */
+  public static interface Options extends PipelineOptions {
+
+    @Description("Path to the data file(s) containing game data.")
+    // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent
+    // day's worth (roughly) of data.
+    @Default.String("gs://dataflow-samples/game/gaming_data*.csv")
+    String getInput();
+    void setInput(String value);
+
+    @Description("BigQuery Dataset to write tables to. Must already exist.")
+    @Validation.Required
+    String getDataset();
+    void setDataset(String value);
+
+    @Description("The BigQuery table name. Should not already exist.")
+    @Default.String("user_score")
+    String getTableName();
+    void setTableName(String value);
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+   */
+  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+    configureBigQueryWrite() {
+    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put("user",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> c.element().getKey()));
+    tableConfigure.put("total_score",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> c.element().getValue()));
+    return tableConfigure;
+  }
+
+
+  /**
+   * Run a batch pipeline.
+   */
+ // [START DocInclude_USMain]
+  public static void main(String[] args) throws Exception {
+    // Begin constructing a pipeline configured by commandline flags.
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    // Read events from a text file and parse them.
+    pipeline.apply(TextIO.Read.from(options.getInput()))
+      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+      // Extract and sum username/score pairs from the event data.
+      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+      .apply("WriteUserScoreSums",
+          new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
+                                                   configureBigQueryWrite()));
+
+    // Run the batch pipeline.
+    pipeline.run();
+  }
+  // [END DocInclude_USMain]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
new file mode 100644
index 0000000..1691c54
--- /dev/null
+++ b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
@@ -0,0 +1,415 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+
+
+/**
+ * This is a generator that simulates usage data from a mobile game, and either publishes the data
+ * to a pubsub topic or writes it to a file.
+ *
+ * <p> The general model used by the generator is the following. There is a set of teams with team
+ * members. Each member is scoring points for their team. After some period, a team will dissolve
+ * and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
+ * They hop from team to team. The robots are set to have a higher 'click rate' (generate more
+ * events) than the regular team members.
+ *
+ * <p> Each generated line of data has the following form:
+ * username,teamname,score,timestamp_in_ms,readable_time
+ * e.g.:
+ * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+ *
+ * <p> The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
+ * specified. It takes the following arguments:
+ * {@code Injector project-name (topic-name|none) (filename|none)}.
+ *
+ * <p> To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
+ * locally using project-based service account credentials to avoid running over PubSub
+ * quota.
+ * See https://developers.google.com/identity/protocols/application-default-credentials
+ * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
+ * environment variable to point to your downloaded service account credentials before starting the
+ * program, e.g.:
+ * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
+ * If you do not do this, then your injector will only run for a few minutes on your
+ * 'user account' credentials before you will start to see quota error messages like:
+ * "Request throttled due to user QPS limit being reached", and see this exception:
+ * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
+ * Once you've set up your credentials, run the Injector like this":
+  * <pre>{@code
+ * Injector <project-name> <topic-name> none
+ * }
+ * </pre>
+ * The pubsub topic will be created if it does not exist.
+ *
+ * <p> To run the injector in write-to-file-mode, set the topic name to "none" and specify the
+ * filename:
+ * <pre>{@code
+ * Injector <project-name> none <filename>
+ * }
+ * </pre>
+ */
+class Injector {
+  private static Pubsub pubsub;
+  private static Random random = new Random();
+  private static String topic;
+  private static String project;
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+  // QPS ranges from 800 to 1000.
+  private static final int MIN_QPS = 800;
+  private static final int QPS_RANGE = 200;
+  // How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
+  private static final int THREAD_SLEEP_MS = 500;
+
+  // Lists used to generate random team names.
+  private static final ArrayList<String> COLORS =
+      new ArrayList<String>(Arrays.asList(
+         "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
+         "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
+         "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
+         "Beige", "Bisque", "BarnRed", "BattleshipGrey"));
+
+  private static final ArrayList<String> ANIMALS =
+      new ArrayList<String>(Arrays.asList(
+         "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
+         "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
+         "Bandicoot", "Cockatoo", "Antechinus"));
+
+  // The list of live teams.
+  private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
+
+  private static DateTimeFormatter fmt =
+    DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+        .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+
+  // The total number of robots in the system.
+  private static final int NUM_ROBOTS = 20;
+  // Determines the chance that a team will have a robot team member.
+  private static final int ROBOT_PROBABILITY = 3;
+  private static final int NUM_LIVE_TEAMS = 15;
+  private static final int BASE_MEMBERS_PER_TEAM = 5;
+  private static final int MEMBERS_PER_TEAM = 15;
+  private static final int MAX_SCORE = 20;
+  private static final int LATE_DATA_RATE = 5 * 60 * 2;       // Every 10 minutes
+  private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000;  // 5-10 minute delay
+  private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
+
+  // The minimum time a 'team' can live.
+  private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
+  private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
+
+
+  /**
+   * A class for holding team info: the name of the team, when it started,
+   * and the current team members. Teams may but need not include one robot team member.
+   */
+  private static class TeamInfo {
+    String teamName;
+    long startTimeInMillis;
+    int expirationPeriod;
+    // The team might but need not include 1 robot. Will be non-null if so.
+    String robot;
+    int numMembers;
+
+    private TeamInfo(String teamName, long startTimeInMillis, String robot) {
+      this.teamName = teamName;
+      this.startTimeInMillis = startTimeInMillis;
+      // How long until this team is dissolved.
+      this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) +
+        BASE_TEAM_EXPIRATION_TIME_IN_MINS;
+      this.robot = robot;
+      // Determine the number of team members.
+      numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
+    }
+
+    String getTeamName() {
+      return teamName;
+    }
+    String getRobot() {
+      return robot;
+    }
+
+    long getStartTimeInMillis() {
+      return startTimeInMillis;
+    }
+    long getEndTimeInMillis() {
+      return startTimeInMillis + (expirationPeriod * 60 * 1000);
+    }
+    String getRandomUser() {
+      int userNum = random.nextInt(numMembers);
+      return "user" + userNum + "_" + teamName;
+    }
+
+    int numMembers() {
+      return numMembers;
+    }
+
+    @Override
+    public String toString() {
+      return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
+        + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
+    }
+  }
+
+  /** Utility to grab a random element from an array of Strings. */
+  private static String randomElement(ArrayList<String> list) {
+    int index = random.nextInt(list.size());
+    return list.get(index);
+  }
+
+  /**
+   * Get and return a random team. If the selected team is too old w.r.t its expiration, remove
+   * it, replacing it with a new team.
+   */
+  private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
+    int index = random.nextInt(list.size());
+    TeamInfo team = list.get(index);
+    // If the selected team is expired, remove it and return a new team.
+    long currTime = System.currentTimeMillis();
+    if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
+      System.out.println("\nteam " + team + " is too old; replacing.");
+      System.out.println("start time: " + team.getStartTimeInMillis() +
+        ", end time: " + team.getEndTimeInMillis() +
+        ", current time:" + currTime);
+      removeTeam(index);
+      // Add a new team in its stead.
+      return (addLiveTeam());
+    } else {
+      return team;
+    }
+  }
+
+  /**
+   * Create and add a team. Possibly add a robot to the team.
+   */
+  private static synchronized TeamInfo addLiveTeam() {
+    String teamName = randomElement(COLORS) + randomElement(ANIMALS);
+    String robot = null;
+    // Decide if we want to add a robot to the team.
+    if (random.nextInt(ROBOT_PROBABILITY) == 0) {
+      robot = "Robot-" + random.nextInt(NUM_ROBOTS);
+    }
+    // Create the new team.
+    TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
+    liveTeams.add(newTeam);
+    System.out.println("[+" + newTeam + "]");
+    return newTeam;
+  }
+
+  /**
+   * Remove a specific team.
+   */
+  private static synchronized void removeTeam(int teamIndex) {
+    TeamInfo removedTeam = liveTeams.remove(teamIndex);
+    System.out.println("[-" + removedTeam + "]");
+  }
+
+  /** Generate a user gaming event. */
+  private static String generateEvent(Long currTime, int delayInMillis) {
+    TeamInfo team = randomTeam(liveTeams);
+    String teamName = team.getTeamName();
+    String user;
+    final int parseErrorRate = 900000;
+
+    String robot = team.getRobot();
+    // If the team has an associated robot team member...
+    if (robot != null) {
+      // Then use that robot for the message with some probability.
+      // Set this probability to higher than that used to select any of the 'regular' team
+      // members, so that if there is a robot on the team, it has a higher click rate.
+      if (random.nextInt(team.numMembers() / 2) == 0) {
+        user = robot;
+      } else {
+        user = team.getRandomUser();
+      }
+    } else { // No robot.
+      user = team.getRandomUser();
+    }
+    String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
+    // Randomly introduce occasional parse errors. You can see a custom counter tracking the number
+    // of such errors in the Dataflow Monitoring UI, as the example pipeline runs.
+    if (random.nextInt(parseErrorRate) == 0) {
+      System.out.println("Introducing a parse error.");
+      event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
+    }
+    return addTimeInfoToEvent(event, currTime, delayInMillis);
+  }
+
+  /**
+   * Add time info to a generated gaming event.
+   */
+  private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
+    String eventTimeString =
+        Long.toString((currTime - delayInMillis) / 1000 * 1000);
+    // Add a (redundant) 'human-readable' date string to make the data semantics more clear.
+    String dateString = fmt.print(currTime);
+    message = message + "," + eventTimeString + "," + dateString;
+    return message;
+  }
+
+  /**
+   * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
+   * PubSub topic.
+   */
+  public static void publishData(int numMessages, int delayInMillis)
+      throws IOException {
+    List<PubsubMessage> pubsubMessages = new ArrayList<>();
+
+    for (int i = 0; i < Math.max(1, numMessages); i++) {
+      Long currTime = System.currentTimeMillis();
+      String message = generateEvent(currTime, delayInMillis);
+      PubsubMessage pubsubMessage = new PubsubMessage()
+              .encodeData(message.getBytes("UTF-8"));
+      pubsubMessage.setAttributes(
+          ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
+              Long.toString((currTime - delayInMillis) / 1000 * 1000)));
+      if (delayInMillis != 0) {
+        System.out.println(pubsubMessage.getAttributes());
+        System.out.println("late data for: " + message);
+      }
+      pubsubMessages.add(pubsubMessage);
+    }
+
+    PublishRequest publishRequest = new PublishRequest();
+    publishRequest.setMessages(pubsubMessages);
+    pubsub.projects().topics().publish(topic, publishRequest).execute();
+  }
+
+  /**
+   * Publish generated events to a file.
+   */
+  public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
+      throws IOException {
+    PrintWriter out = new PrintWriter(new OutputStreamWriter(
+        new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
+
+    try {
+      for (int i = 0; i < Math.max(1, numMessages); i++) {
+        Long currTime = System.currentTimeMillis();
+        String message = generateEvent(currTime, delayInMillis);
+        out.println(message);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (out != null) {
+        out.flush();
+        out.close();
+      }
+    }
+  }
+
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    if (args.length < 3) {
+      System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
+      System.exit(1);
+    }
+    boolean writeToFile = false;
+    boolean writeToPubsub = true;
+    project = args[0];
+    String topicName = args[1];
+    String fileName = args[2];
+    // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
+    // specified; otherwise, it will try to write to a file.
+    if (topicName.equalsIgnoreCase("none")) {
+      writeToFile = true;
+      writeToPubsub = false;
+    }
+    if (writeToPubsub) {
+      // Create the PubSub client.
+      pubsub = InjectorUtils.getClient();
+      // Create the PubSub topic as necessary.
+      topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
+      InjectorUtils.createTopic(pubsub, topic);
+      System.out.println("Injecting to topic: " + topic);
+    } else {
+      if (fileName.equalsIgnoreCase("none")) {
+        System.out.println("Filename not specified.");
+        System.exit(1);
+      }
+      System.out.println("Writing to file: " + fileName);
+    }
+    System.out.println("Starting Injector");
+
+    // Start off with some random live teams.
+    while (liveTeams.size() < NUM_LIVE_TEAMS) {
+      addLiveTeam();
+    }
+
+    // Publish messages at a rate determined by the QPS and Thread sleep settings.
+    for (int i = 0; true; i++) {
+      if (Thread.activeCount() > 10) {
+        System.err.println("I'm falling behind!");
+      }
+
+      // Decide if this should be a batch of late data.
+      final int numMessages;
+      final int delayInMillis;
+      if (i % LATE_DATA_RATE == 0) {
+        // Insert delayed data for one user (one message only)
+        delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
+        numMessages = 1;
+        System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
+      } else {
+        System.out.print(".");
+        delayInMillis = 0;
+        numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
+      }
+
+      if (writeToFile) { // Won't use threading for the file write.
+        publishDataToFile(fileName, numMessages, delayInMillis);
+      } else { // Write to PubSub.
+        // Start a thread to inject some data.
+        new Thread(){
+          @Override
+          public void run() {
+            try {
+              publishData(numMessages, delayInMillis);
+            } catch (IOException e) {
+              System.err.println(e);
+            }
+          }
+        }.start();
+      }
+
+      // Wait before creating another injector thread.
+      Thread.sleep(THREAD_SLEEP_MS);
+    }
+  }
+}


Mime
View raw message