beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [40/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:48:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
deleted file mode 100644
index 18ff0a3..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.BufferedOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-
-
-/**
- * This is a generator that simulates usage data from a mobile game, and either publishes the data
- * to a pubsub topic or writes it to a file.
- *
- * <p> The general model used by the generator is the following. There is a set of teams with team
- * members. Each member is scoring points for their team. After some period, a team will dissolve
- * and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
- * They hop from team to team. The robots are set to have a higher 'click rate' (generate more
- * events) than the regular team members.
- *
- * <p> Each generated line of data has the following form:
- * username,teamname,score,timestamp_in_ms,readable_time
- * e.g.:
- * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
- *
- * <p> The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
- * specified. It takes the following arguments:
- * {@code Injector project-name (topic-name|none) (filename|none)}.
- *
- * <p> To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
- * locally using project-based service account credentials to avoid running over PubSub
- * quota.
- * See https://developers.google.com/identity/protocols/application-default-credentials
- * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
- * environment variable to point to your downloaded service account credentials before starting the
- * program, e.g.:
- * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
- * If you do not do this, then your injector will only run for a few minutes on your
- * 'user account' credentials before you will start to see quota error messages like:
- * "Request throttled due to user QPS limit being reached", and see this exception:
- * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
- * Once you've set up your credentials, run the Injector like this":
-  * <pre>{@code
- * Injector <project-name> <topic-name> none
- * }
- * </pre>
- * The pubsub topic will be created if it does not exist.
- *
- * <p> To run the injector in write-to-file-mode, set the topic name to "none" and specify the
- * filename:
- * <pre>{@code
- * Injector <project-name> none <filename>
- * }
- * </pre>
- */
-class Injector {
-  private static Pubsub pubsub;
-  private static Random random = new Random();
-  private static String topic;
-  private static String project;
-  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
-
-  // QPS ranges from 800 to 1000.
-  private static final int MIN_QPS = 800;
-  private static final int QPS_RANGE = 200;
-  // How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
-  private static final int THREAD_SLEEP_MS = 500;
-
-  // Lists used to generate random team names.
-  private static final ArrayList<String> COLORS =
-      new ArrayList<String>(Arrays.asList(
-         "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
-         "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
-         "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
-         "Beige", "Bisque", "BarnRed", "BattleshipGrey"));
-
-  private static final ArrayList<String> ANIMALS =
-      new ArrayList<String>(Arrays.asList(
-         "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
-         "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
-         "Bandicoot", "Cockatoo", "Antechinus"));
-
-  // The list of live teams.
-  private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
-
-  private static DateTimeFormatter fmt =
-    DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
-        .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-
-
-  // The total number of robots in the system.
-  private static final int NUM_ROBOTS = 20;
-  // Determines the chance that a team will have a robot team member.
-  private static final int ROBOT_PROBABILITY = 3;
-  private static final int NUM_LIVE_TEAMS = 15;
-  private static final int BASE_MEMBERS_PER_TEAM = 5;
-  private static final int MEMBERS_PER_TEAM = 15;
-  private static final int MAX_SCORE = 20;
-  private static final int LATE_DATA_RATE = 5 * 60 * 2;       // Every 10 minutes
-  private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000;  // 5-10 minute delay
-  private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
-
-  // The minimum time a 'team' can live.
-  private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
-  private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
-
-
-  /**
-   * A class for holding team info: the name of the team, when it started,
-   * and the current team members. Teams may but need not include one robot team member.
-   */
-  private static class TeamInfo {
-    String teamName;
-    long startTimeInMillis;
-    int expirationPeriod;
-    // The team might but need not include 1 robot. Will be non-null if so.
-    String robot;
-    int numMembers;
-
-    private TeamInfo(String teamName, long startTimeInMillis, String robot) {
-      this.teamName = teamName;
-      this.startTimeInMillis = startTimeInMillis;
-      // How long until this team is dissolved.
-      this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) +
-        BASE_TEAM_EXPIRATION_TIME_IN_MINS;
-      this.robot = robot;
-      // Determine the number of team members.
-      numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
-    }
-
-    String getTeamName() {
-      return teamName;
-    }
-    String getRobot() {
-      return robot;
-    }
-
-    long getStartTimeInMillis() {
-      return startTimeInMillis;
-    }
-    long getEndTimeInMillis() {
-      return startTimeInMillis + (expirationPeriod * 60 * 1000);
-    }
-    String getRandomUser() {
-      int userNum = random.nextInt(numMembers);
-      return "user" + userNum + "_" + teamName;
-    }
-
-    int numMembers() {
-      return numMembers;
-    }
-
-    @Override
-    public String toString() {
-      return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
-        + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
-    }
-  }
-
-  /** Utility to grab a random element from an array of Strings. */
-  private static String randomElement(ArrayList<String> list) {
-    int index = random.nextInt(list.size());
-    return list.get(index);
-  }
-
-  /**
-   * Get and return a random team. If the selected team is too old w.r.t its expiration, remove
-   * it, replacing it with a new team.
-   */
-  private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
-    int index = random.nextInt(list.size());
-    TeamInfo team = list.get(index);
-    // If the selected team is expired, remove it and return a new team.
-    long currTime = System.currentTimeMillis();
-    if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
-      System.out.println("\nteam " + team + " is too old; replacing.");
-      System.out.println("start time: " + team.getStartTimeInMillis() +
-        ", end time: " + team.getEndTimeInMillis() +
-        ", current time:" + currTime);
-      removeTeam(index);
-      // Add a new team in its stead.
-      return (addLiveTeam());
-    } else {
-      return team;
-    }
-  }
-
-  /**
-   * Create and add a team. Possibly add a robot to the team.
-   */
-  private static synchronized TeamInfo addLiveTeam() {
-    String teamName = randomElement(COLORS) + randomElement(ANIMALS);
-    String robot = null;
-    // Decide if we want to add a robot to the team.
-    if (random.nextInt(ROBOT_PROBABILITY) == 0) {
-      robot = "Robot-" + random.nextInt(NUM_ROBOTS);
-    }
-    // Create the new team.
-    TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
-    liveTeams.add(newTeam);
-    System.out.println("[+" + newTeam + "]");
-    return newTeam;
-  }
-
-  /**
-   * Remove a specific team.
-   */
-  private static synchronized void removeTeam(int teamIndex) {
-    TeamInfo removedTeam = liveTeams.remove(teamIndex);
-    System.out.println("[-" + removedTeam + "]");
-  }
-
-  /** Generate a user gaming event. */
-  private static String generateEvent(Long currTime, int delayInMillis) {
-    TeamInfo team = randomTeam(liveTeams);
-    String teamName = team.getTeamName();
-    String user;
-    final int parseErrorRate = 900000;
-
-    String robot = team.getRobot();
-    // If the team has an associated robot team member...
-    if (robot != null) {
-      // Then use that robot for the message with some probability.
-      // Set this probability to higher than that used to select any of the 'regular' team
-      // members, so that if there is a robot on the team, it has a higher click rate.
-      if (random.nextInt(team.numMembers() / 2) == 0) {
-        user = robot;
-      } else {
-        user = team.getRandomUser();
-      }
-    } else { // No robot.
-      user = team.getRandomUser();
-    }
-    String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
-    // Randomly introduce occasional parse errors. You can see a custom counter tracking the number
-    // of such errors in the Dataflow Monitoring UI, as the example pipeline runs.
-    if (random.nextInt(parseErrorRate) == 0) {
-      System.out.println("Introducing a parse error.");
-      event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
-    }
-    return addTimeInfoToEvent(event, currTime, delayInMillis);
-  }
-
-  /**
-   * Add time info to a generated gaming event.
-   */
-  private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
-    String eventTimeString =
-        Long.toString((currTime - delayInMillis) / 1000 * 1000);
-    // Add a (redundant) 'human-readable' date string to make the data semantics more clear.
-    String dateString = fmt.print(currTime);
-    message = message + "," + eventTimeString + "," + dateString;
-    return message;
-  }
-
-  /**
-   * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
-   * PubSub topic.
-   */
-  public static void publishData(int numMessages, int delayInMillis)
-      throws IOException {
-    List<PubsubMessage> pubsubMessages = new ArrayList<>();
-
-    for (int i = 0; i < Math.max(1, numMessages); i++) {
-      Long currTime = System.currentTimeMillis();
-      String message = generateEvent(currTime, delayInMillis);
-      PubsubMessage pubsubMessage = new PubsubMessage()
-              .encodeData(message.getBytes("UTF-8"));
-      pubsubMessage.setAttributes(
-          ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
-              Long.toString((currTime - delayInMillis) / 1000 * 1000)));
-      if (delayInMillis != 0) {
-        System.out.println(pubsubMessage.getAttributes());
-        System.out.println("late data for: " + message);
-      }
-      pubsubMessages.add(pubsubMessage);
-    }
-
-    PublishRequest publishRequest = new PublishRequest();
-    publishRequest.setMessages(pubsubMessages);
-    pubsub.projects().topics().publish(topic, publishRequest).execute();
-  }
-
-  /**
-   * Publish generated events to a file.
-   */
-  public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
-      throws IOException {
-    PrintWriter out = new PrintWriter(new OutputStreamWriter(
-        new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
-
-    try {
-      for (int i = 0; i < Math.max(1, numMessages); i++) {
-        Long currTime = System.currentTimeMillis();
-        String message = generateEvent(currTime, delayInMillis);
-        out.println(message);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      if (out != null) {
-        out.flush();
-        out.close();
-      }
-    }
-  }
-
-
-  public static void main(String[] args) throws IOException, InterruptedException {
-    if (args.length < 3) {
-      System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
-      System.exit(1);
-    }
-    boolean writeToFile = false;
-    boolean writeToPubsub = true;
-    project = args[0];
-    String topicName = args[1];
-    String fileName = args[2];
-    // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
-    // specified; otherwise, it will try to write to a file.
-    if (topicName.equalsIgnoreCase("none")) {
-      writeToFile = true;
-      writeToPubsub = false;
-    }
-    if (writeToPubsub) {
-      // Create the PubSub client.
-      pubsub = InjectorUtils.getClient();
-      // Create the PubSub topic as necessary.
-      topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
-      InjectorUtils.createTopic(pubsub, topic);
-      System.out.println("Injecting to topic: " + topic);
-    } else {
-      if (fileName.equalsIgnoreCase("none")) {
-        System.out.println("Filename not specified.");
-        System.exit(1);
-      }
-      System.out.println("Writing to file: " + fileName);
-    }
-    System.out.println("Starting Injector");
-
-    // Start off with some random live teams.
-    while (liveTeams.size() < NUM_LIVE_TEAMS) {
-      addLiveTeam();
-    }
-
-    // Publish messages at a rate determined by the QPS and Thread sleep settings.
-    for (int i = 0; true; i++) {
-      if (Thread.activeCount() > 10) {
-        System.err.println("I'm falling behind!");
-      }
-
-      // Decide if this should be a batch of late data.
-      final int numMessages;
-      final int delayInMillis;
-      if (i % LATE_DATA_RATE == 0) {
-        // Insert delayed data for one user (one message only)
-        delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
-        numMessages = 1;
-        System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
-      } else {
-        System.out.print(".");
-        delayInMillis = 0;
-        numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
-      }
-
-      if (writeToFile) { // Won't use threading for the file write.
-        publishDataToFile(fileName, numMessages, delayInMillis);
-      } else { // Write to PubSub.
-        // Start a thread to inject some data.
-        new Thread(){
-          @Override
-          public void run() {
-            try {
-              publishData(numMessages, delayInMillis);
-            } catch (IOException e) {
-              System.err.println(e);
-            }
-          }
-        }.start();
-      }
-
-      // Wait before creating another injector thread.
-      Thread.sleep(THREAD_SLEEP_MS);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
deleted file mode 100644
index db58650..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.util.Utils;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.PubsubScopes;
-import com.google.api.services.pubsub.model.Topic;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-class InjectorUtils {
-
-  private static final String APP_NAME = "injector";
-
-  /**
-   * Builds a new Pubsub client and returns it.
-   */
-  public static Pubsub getClient(final HttpTransport httpTransport,
-                                 final JsonFactory jsonFactory)
-           throws IOException {
-      Preconditions.checkNotNull(httpTransport);
-      Preconditions.checkNotNull(jsonFactory);
-      GoogleCredential credential =
-          GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
-      if (credential.createScopedRequired()) {
-          credential = credential.createScoped(PubsubScopes.all());
-      }
-      if (credential.getClientAuthentication() != null) {
-        System.out.println("\n***Warning! You are not using service account credentials to "
-          + "authenticate.\nYou need to use service account credentials for this example,"
-          + "\nsince user-level credentials do not have enough pubsub quota,\nand so you will run "
-          + "out of PubSub quota very quickly.\nSee "
-          + "https://developers.google.com/identity/protocols/application-default-credentials.");
-        System.exit(1);
-      }
-      HttpRequestInitializer initializer =
-          new RetryHttpInitializerWrapper(credential);
-      return new Pubsub.Builder(httpTransport, jsonFactory, initializer)
-              .setApplicationName(APP_NAME)
-              .build();
-  }
-
-  /**
-   * Builds a new Pubsub client with default HttpTransport and
-   * JsonFactory and returns it.
-   */
-  public static Pubsub getClient() throws IOException {
-      return getClient(Utils.getDefaultTransport(),
-                       Utils.getDefaultJsonFactory());
-  }
-
-
-  /**
-   * Returns the fully qualified topic name for Pub/Sub.
-   */
-  public static String getFullyQualifiedTopicName(
-          final String project, final String topic) {
-      return String.format("projects/%s/topics/%s", project, topic);
-  }
-
-  /**
-   * Create a topic if it doesn't exist.
-   */
-  public static void createTopic(Pubsub client, String fullTopicName)
-      throws IOException {
-    try {
-        client.projects().topics().get(fullTopicName).execute();
-    } catch (GoogleJsonResponseException e) {
-      if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
-        Topic topic = client.projects().topics()
-                .create(fullTopicName, new Topic())
-                .execute();
-        System.out.printf("Topic %s was created.\n", topic.getName());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
deleted file mode 100644
index 9d58837..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpBackOffIOExceptionHandler;
-import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-/**
- * RetryHttpInitializerWrapper will automatically retry upon RPC
- * failures, preserving the auto-refresh behavior of the Google
- * Credentials.
- */
-public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
-
-    /**
-     * A private logger.
-     */
-    private static final Logger LOG =
-            Logger.getLogger(RetryHttpInitializerWrapper.class.getName());
-
-    /**
-     * One minutes in miliseconds.
-     */
-    private static final int ONEMINITUES = 60000;
-
-    /**
-     * Intercepts the request for filling in the "Authorization"
-     * header field, as well as recovering from certain unsuccessful
-     * error codes wherein the Credential must refresh its token for a
-     * retry.
-     */
-    private final Credential wrappedCredential;
-
-    /**
-     * A sleeper; you can replace it with a mock in your test.
-     */
-    private final Sleeper sleeper;
-
-    /**
-     * A constructor.
-     *
-     * @param wrappedCredential Credential which will be wrapped and
-     * used for providing auth header.
-     */
-    public RetryHttpInitializerWrapper(final Credential wrappedCredential) {
-        this(wrappedCredential, Sleeper.DEFAULT);
-    }
-
-    /**
-     * A protected constructor only for testing.
-     *
-     * @param wrappedCredential Credential which will be wrapped and
-     * used for providing auth header.
-     * @param sleeper Sleeper for easy testing.
-     */
-    RetryHttpInitializerWrapper(
-            final Credential wrappedCredential, final Sleeper sleeper) {
-        this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
-        this.sleeper = sleeper;
-    }
-
-    /**
-     * Initializes the given request.
-     */
-    @Override
-    public final void initialize(final HttpRequest request) {
-        request.setReadTimeout(2 * ONEMINITUES); // 2 minutes read timeout
-        final HttpUnsuccessfulResponseHandler backoffHandler =
-                new HttpBackOffUnsuccessfulResponseHandler(
-                        new ExponentialBackOff())
-                        .setSleeper(sleeper);
-        request.setInterceptor(wrappedCredential);
-        request.setUnsuccessfulResponseHandler(
-                new HttpUnsuccessfulResponseHandler() {
-                    @Override
-                    public boolean handleResponse(
-                            final HttpRequest request,
-                            final HttpResponse response,
-                            final boolean supportsRetry) throws IOException {
-                        if (wrappedCredential.handleResponse(
-                                request, response, supportsRetry)) {
-                            // If credential decides it can handle it,
-                            // the return code or message indicated
-                            // something specific to authentication,
-                            // and no backoff is desired.
-                            return true;
-                        } else if (backoffHandler.handleResponse(
-                                request, response, supportsRetry)) {
-                            // Otherwise, we defer to the judgement of
-                            // our internal backoff handler.
-                            LOG.info("Retrying "
-                                    + request.getUrl().toString());
-                            return true;
-                        } else {
-                            return false;
-                        }
-                    }
-                });
-        request.setIOExceptionHandler(
-                new HttpBackOffIOExceptionHandler(new ExponentialBackOff())
-                        .setSleeper(sleeper));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
deleted file mode 100644
index 4bcfb72..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
+++ /dev/null
@@ -1,135 +0,0 @@
-  /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.utils;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.complete.game.UserScore;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Generate, format, and write BigQuery table row information. Use provided information about
- * the field names and types, as well as lambda functions that describe how to generate their
- * values.
- */
-public class WriteToBigQuery<T>
-    extends PTransform<PCollection<T>, PDone> {
-
-  protected String tableName;
-  protected Map<String, FieldInfo<T>> fieldInfo;
-
-  public WriteToBigQuery() {
-  }
-
-  public WriteToBigQuery(String tableName,
-      Map<String, FieldInfo<T>> fieldInfo) {
-    this.tableName = tableName;
-    this.fieldInfo = fieldInfo;
-  }
-
-  /** Define a class to hold information about output table field definitions. */
-  public static class FieldInfo<T> implements Serializable {
-    // The BigQuery 'type' of the field
-    private String fieldType;
-    // A lambda function to generate the field value
-    private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn;
-
-    public FieldInfo(String fieldType,
-        SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
-      this.fieldType = fieldType;
-      this.fieldFn = fieldFn;
-    }
-
-    String getFieldType() {
-      return this.fieldType;
-    }
-
-    SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
-      return this.fieldFn;
-    }
-  }
-  /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
-  protected class BuildRowFn extends DoFn<T, TableRow> {
-
-    @Override
-    public void processElement(ProcessContext c) {
-
-      TableRow row = new TableRow();
-      for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
-          String key = entry.getKey();
-          FieldInfo<T> fcnInfo = entry.getValue();
-          SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
-            fcnInfo.getFieldFn();
-          row.set(key, fcn.apply(c));
-        }
-      c.output(row);
-    }
-  }
-
-  /** Build the output table schema. */
-  protected TableSchema getSchema() {
-    List<TableFieldSchema> fields = new ArrayList<>();
-    for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
-      String key = entry.getKey();
-      FieldInfo<T> fcnInfo = entry.getValue();
-      String bqType = fcnInfo.getFieldType();
-      fields.add(new TableFieldSchema().setName(key).setType(bqType));
-    }
-    return new TableSchema().setFields(fields);
-  }
-
-  @Override
-  public PDone apply(PCollection<T> teamAndScore) {
-    return teamAndScore
-      .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
-      .apply(BigQueryIO.Write
-                .to(getTable(teamAndScore.getPipeline(),
-                    tableName))
-                .withSchema(getSchema())
-                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
-  }
-
-  /** Utility to construct an output table reference. */
-  static TableReference getTable(Pipeline pipeline, String tableName) {
-    PipelineOptions options = pipeline.getOptions();
-    TableReference table = new TableReference();
-    table.setDatasetId(options.as(UserScore.Options.class).getDataset());
-    table.setProjectId(options.as(GcpOptions.class).getProject());
-    table.setTableId(tableName);
-    return table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
deleted file mode 100644
index 41257ca..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ /dev/null
@@ -1,77 +0,0 @@
-  /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.utils;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import java.util.Map;
-
-/**
- * Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery}
- * to require windowing; so this subclass may be used for writes that require access to the
- * context's window information.
- */
-public class WriteWindowedToBigQuery<T>
-    extends WriteToBigQuery<T> {
-
-  public WriteWindowedToBigQuery(String tableName,
-      Map<String, FieldInfo<T>> fieldInfo) {
-    super(tableName, fieldInfo);
-  }
-
-  /** Convert each key/score pair into a BigQuery TableRow. */
-  protected class BuildRowFn extends DoFn<T, TableRow>
-      implements RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
-
-      TableRow row = new TableRow();
-      for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
-          String key = entry.getKey();
-          FieldInfo<T> fcnInfo = entry.getValue();
-          SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
-            fcnInfo.getFieldFn();
-          row.set(key, fcn.apply(c));
-        }
-      c.output(row);
-    }
-  }
-
-  @Override
-  public PDone apply(PCollection<T> teamAndScore) {
-    return teamAndScore
-      .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
-      .apply(BigQueryIO.Write
-                .to(getTable(teamAndScore.getPipeline(),
-                    tableName))
-                .withSchema(getSchema())
-                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
new file mode 100644
index 0000000..8705ed3
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import java.util.Arrays;
+
+/**
+ * An example that counts words in Shakespeare, using Java 8 language features.
+ *
+ * <p>See {@link MinimalWordCount} for a comprehensive explanation.
+ */
+public class MinimalWordCountJava8 {
+
+  public static void main(String[] args) {
+    DataflowPipelineOptions options = PipelineOptionsFactory.create()
+        .as(DataflowPipelineOptions.class);
+
+    options.setRunner(BlockingDataflowPipelineRunner.class);
+
+    // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
+    options.setProject("SET_YOUR_PROJECT_ID_HERE");
+
+    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
+    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
+         .withOutputType(new TypeDescriptor<String>() {}))
+     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+     .apply(Count.<String>perElement())
+     .apply(MapElements
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
+         .withOutputType(new TypeDescriptor<String>() {}))
+
+     // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
+     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
new file mode 100644
index 0000000..89832b2
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.Mean;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the fourth in a series of four pipelines that tell a story in a 'gaming'
+ * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}.
+ * New concepts: session windows and finding session duration; use of both
+ * singleton and non-singleton side inputs.
+ *
+ * <p> This pipeline builds on the {@link LeaderBoard} functionality, and adds some "business
+ * intelligence" analysis: abuse detection and usage patterns. The pipeline derives the Mean user
+ * score sum for a window, and uses that information to identify likely spammers/robots. (The robots
+ * have a higher click rate than the human users). The 'robot' users are then filtered out when
+ * calculating the team scores.
+ *
+ * <p> Additionally, user sessions are tracked: that is, we find bursts of user activity using
+ * session windows. Then, the mean session duration information is recorded in the context of
+ * subsequent fixed windowing. (This could be used to tell us what games are giving us greater
+ * user retention).
+ *
+ * <p> Run {@code com.google.cloud.dataflow.examples.complete.game.injector.Injector} to generate
+ * pubsub data for this pipeline. The {@code Injector} documentation provides more detail.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist. The PubSub topic you specify should
+ * be the same topic to which the Injector is publishing.
+ */
+public class GameStats extends LeaderBoard {
+
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+  private static DateTimeFormatter fmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+  /**
+   * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs.
+   * We do this by finding the mean total score per user, then using that information as a side
+   * input to filter out all but those user scores that are > (mean * SCORE_WEIGHT)
+   */
+  // [START DocInclude_AbuseDetect]
+  public static class CalculateSpammyUsers
+      extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
+    private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
+    private static final double SCORE_WEIGHT = 2.5;
+
+    @Override
+    public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
+
+      // Get the sum of scores for each user.
+      PCollection<KV<String, Integer>> sumScores = userScores
+          .apply("UserSum", Sum.<String>integersPerKey());
+
+      // Extract the score from each element, and use it to find the global mean.
+      final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
+          .apply(Mean.<Integer>globally().asSingletonView());
+
+      // Filter the user sums using the global mean.
+      PCollection<KV<String, Integer>> filtered = sumScores
+          .apply(ParDo
+              .named("ProcessAndFilter")
+              // use the derived mean total score as a side input
+              .withSideInputs(globalMeanScore)
+              .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+                private final Aggregator<Long, Long> numSpammerUsers =
+                  createAggregator("SpammerUsers", new Sum.SumLongFn());
+                @Override
+                public void processElement(ProcessContext c) {
+                  Integer score = c.element().getValue();
+                  Double gmc = c.sideInput(globalMeanScore);
+                  if (score > (gmc * SCORE_WEIGHT)) {
+                    LOG.info("user " + c.element().getKey() + " spammer score " + score
+                        + " with mean " + gmc);
+                    numSpammerUsers.addValue(1L);
+                    c.output(c.element());
+                  }
+                }
+              }));
+      return filtered;
+    }
+  }
+  // [END DocInclude_AbuseDetect]
+
+  /**
+   * Calculate and output an element's session duration.
+   */
+  private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer>
+      implements RequiresWindowAccess {
+
+    @Override
+    public void processElement(ProcessContext c) {
+      IntervalWindow w = (IntervalWindow) c.window();
+      int duration = new Duration(
+          w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
+      c.output(duration);
+    }
+  }
+
+
+  /**
+   * Options supported by {@link GameStats}.
+   */
+  static interface Options extends LeaderBoard.Options {
+    @Description("Numeric value of fixed window duration for user analysis, in minutes")
+    @Default.Integer(60)
+    Integer getFixedWindowDuration();
+    void setFixedWindowDuration(Integer value);
+
+    @Description("Numeric value of gap between user sessions, in minutes")
+    @Default.Integer(5)
+    Integer getSessionGap();
+    void setSessionGap(Integer value);
+
+    @Description("Numeric value of fixed window for finding mean of user session duration, "
+        + "in minutes")
+    @Default.Integer(30)
+    Integer getUserActivityWindowDuration();
+    void setUserActivityWindowDuration(Integer value);
+
+    @Description("Prefix used for the BigQuery table names")
+    @Default.String("game_stats")
+    String getTablePrefix();
+    void setTablePrefix(String value);
+  }
+
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write information about team score sums.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureWindowedWrite() {
+    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put("team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+            c -> c.element().getKey()));
+    tableConfigure.put("total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+            c -> c.element().getValue()));
+    tableConfigure.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    tableConfigure.put("processing_time",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> fmt.print(Instant.now())));
+    return tableConfigure;
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write information about mean user session time.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
+      configureSessionWindowWrite() {
+
+    Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<Double>>();
+    tableConfigure.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<Double>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    tableConfigure.put("mean_duration",
+        new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", c -> c.element()));
+    return tableConfigure;
+  }
+
+
+
+  public static void main(String[] args) throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    // Enforce that this pipeline is always run in streaming mode.
+    options.setStreaming(true);
+    // Allow the pipeline to be cancelled automatically.
+    options.setRunner(DataflowPipelineRunner.class);
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    Pipeline pipeline = Pipeline.create(options);
+
+    // Read Events from Pub/Sub using custom timestamps
+    PCollection<GameActionInfo> rawEvents = pipeline
+        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+
+    // Extract username/score pairs from the event stream
+    PCollection<KV<String, Integer>> userEvents =
+        rawEvents.apply("ExtractUserScore",
+          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+            .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+    // Calculate the total score per user over fixed windows, and
+    // cumulative updates for late data.
+    final PCollectionView<Map<String, Integer>> spammersView = userEvents
+      .apply(Window.named("FixedWindowsUser")
+          .<KV<String, Integer>>into(FixedWindows.of(
+              Duration.standardMinutes(options.getFixedWindowDuration())))
+          )
+
+      // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
+      // These might be robots/spammers.
+      .apply("CalculateSpammyUsers", new CalculateSpammyUsers())
+      // Derive a view from the collection of spammer users. It will be used as a side input
+      // in calculating the team score sums, below.
+      .apply("CreateSpammersView", View.<String, Integer>asMap());
+
+    // [START DocInclude_FilterAndCalc]
+    // Calculate the total score per team over fixed windows,
+    // and emit cumulative updates for late data. Uses the side input derived above-- the set of
+    // suspected robots-- to filter out scores from those users from the sum.
+    // Write the results to BigQuery.
+    rawEvents
+      .apply(Window.named("WindowIntoFixedWindows")
+          .<GameActionInfo>into(FixedWindows.of(
+              Duration.standardMinutes(options.getFixedWindowDuration())))
+          )
+      // Filter out the detected spammer users, using the side input derived above.
+      .apply(ParDo.named("FilterOutSpammers")
+              .withSideInputs(spammersView)
+              .of(new DoFn<GameActionInfo, GameActionInfo>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  // If the user is not in the spammers Map, output the data element.
+                  if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
+                    c.output(c.element());
+                  }
+                }
+              }))
+      // Extract and sum teamname/score pairs from the event data.
+      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+      // [END DocInclude_FilterAndCalc]
+      // Write the result to BigQuery
+      .apply("WriteTeamSums",
+             new WriteWindowedToBigQuery<KV<String, Integer>>(
+                options.getTablePrefix() + "_team", configureWindowedWrite()));
+
+
+    // [START DocInclude_SessionCalc]
+    // Detect user sessions-- that is, a burst of activity separated by a gap from further
+    // activity. Find and record the mean session lengths.
+    // This information could help the game designers track the changing user engagement
+    // as their set of games changes.
+    userEvents
+      .apply(Window.named("WindowIntoSessions")
+            .<KV<String, Integer>>into(
+                  Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
+        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+      // For this use, we care only about the existence of the session, not any particular
+      // information aggregated over it, so the following is an efficient way to do that.
+      .apply(Combine.perKey(x -> 0))
+      // Get the duration per session.
+      .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
+      // [END DocInclude_SessionCalc]
+      // [START DocInclude_Rewindow]
+      // Re-window to process groups of session sums according to when the sessions complete.
+      .apply(Window.named("WindowToExtractSessionMean")
+            .<Integer>into(
+                FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
+      // Find the mean session duration in each window.
+      .apply(Mean.<Integer>globally().withoutDefaults())
+      // Write this info to a BigQuery table.
+      .apply("WriteAvgSessionLength",
+             new WriteWindowedToBigQuery<Double>(
+                options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));
+    // [END DocInclude_Rewindow]
+
+
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = pipeline.run();
+    dataflowUtils.waitToFinish(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
new file mode 100644
index 0000000..58944c5
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.WithTimestamps;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the second in a series of four pipelines that tell a story in a 'gaming'
+ * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
+ * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}.
+ *
+ * <p> This pipeline processes data collected from gaming events in batch, building on {@link
+ * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window,
+ * optionally allowing specification of two timestamps before and after which data is filtered out.
+ * This allows a model where late data collected after the intended analysis window can be included,
+ * and any late-arriving data prior to the beginning of the analysis window can be removed as well.
+ * By using windowing and adding element timestamps, we can do finer-grained analysis than with the
+ * {@link UserScore} pipeline. However, our batch processing is high-latency, in that we don't get
+ * results from plays at the beginning of the batch's time period until the batch is processed.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ *
+ * <p> Optionally include {@code --input} to specify the batch input file path.
+ * To indicate a time after which the data should be filtered out, include the
+ * {@code --stopMin} arg. E.g., {@code --stopMin=2015-10-18-23-59} indicates that any data
+ * timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis.
+ * To indicate a time before which data should be filtered out, include the {@code --startMin} arg.
+ * If you're using the default input specified in {@link UserScore},
+ * "gs://dataflow-samples/game/gaming_data*.csv", then
+ * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values.
+ */
+public class HourlyTeamScore extends UserScore {
+
+  private static DateTimeFormatter fmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+  private static DateTimeFormatter minFmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+
+  /**
+   * Options supported by {@link HourlyTeamScore}.
+   */
+  static interface Options extends UserScore.Options {
+
+    @Description("Numeric value of fixed window duration, in minutes")
+    @Default.Integer(60)
+    Integer getWindowDuration();
+    void setWindowDuration(Integer value);
+
+    @Description("String representation of the first minute after which to generate results,"
+        + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
+        + "Any input data timestamped prior to that minute won't be included in the sums.")
+    @Default.String("1970-01-01-00-00")
+    String getStartMin();
+    void setStartMin(String value);
+
+    @Description("String representation of the first minute for which to not generate results,"
+        + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
+        + "Any input data timestamped after that minute won't be included in the sums.")
+    @Default.String("2100-01-01-00-00")
+    String getStopMin();
+    void setStopMin(String value);
+
+    @Description("The BigQuery table name. Should not already exist.")
+    @Default.String("hourly_team_score")
+    String getTableName();
+    void setTableName(String value);
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and
+   * includes information about window start time.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureWindowedTableWrite() {
+    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfig.put("team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+            c -> c.element().getKey()));
+    tableConfig.put("total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+            c -> c.element().getValue()));
+    tableConfig.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    return tableConfig;
+  }
+
+
+  /**
+   * Run a batch pipeline to do windowed analysis of the data.
+   */
+  // [START DocInclude_HTSMain]
+  public static void main(String[] args) throws Exception {
+    // Begin constructing a pipeline configured by commandline flags.
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
+    final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
+
+    // Read 'gaming' events from a text file.
+    pipeline.apply(TextIO.Read.from(options.getInput()))
+      // Parse the incoming data.
+      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+
+      // Filter out data before and after the given times so that it is not included
+      // in the calculations. As we collect data in batches (say, by day), the batch for the day
+      // that we want to analyze could potentially include some late-arriving data from the previous
+      // day. If so, we want to weed it out. Similarly, if we include data from the following day
+      // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
+      // that fall after the time period we want to analyze.
+      // [START DocInclude_HTSFilters]
+      .apply("FilterStartTime", Filter.byPredicate(
+          (GameActionInfo gInfo)
+              -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
+      .apply("FilterEndTime", Filter.byPredicate(
+          (GameActionInfo gInfo)
+              -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
+      // [END DocInclude_HTSFilters]
+
+      // [START DocInclude_HTSAddTsAndWindow]
+      // Add an element timestamp based on the event log, and apply fixed windowing.
+      .apply("AddEventTimestamps",
+             WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
+      .apply(Window.named("FixedWindowsTeam")
+          .<GameActionInfo>into(FixedWindows.of(
+                Duration.standardMinutes(options.getWindowDuration()))))
+      // [END DocInclude_HTSAddTsAndWindow]
+
+      // Extract and sum teamname/score pairs from the event data.
+      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+      .apply("WriteTeamScoreSums",
+        new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
+            configureWindowedTableWrite()));
+
+
+    pipeline.run();
+  }
+  // [END DocInclude_HTSMain]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
new file mode 100644
index 0000000..cedd696
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the third in a series of four pipelines that tell a story in a 'gaming' domain,
+ * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded
+ * data using fixed windows; use of custom timestamps and event-time processing; generation of
+ * early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late-
+ * arriving data.
+ *
+ * <p> This pipeline processes an unbounded stream of 'game events'. The calculation of the team
+ * scores uses fixed windowing based on event time (the time of the game play event), not
+ * processing time (the time that an event is processed by the pipeline). The pipeline calculates
+ * the sum of scores per team, for each window. By default, the team scores are calculated using
+ * one-hour windows.
+ *
+ * <p> In contrast-- to demo another windowing option-- the user scores are calculated using a
+ * global window, which periodically (every ten minutes) emits cumulative user score sums.
+ *
+ * <p> In contrast to the previous pipelines in the series, which used static, finite input data,
+ * here we're using an unbounded data source, which lets us provide speculative results, and allows
+ * handling of late data, at much lower latency. We can use the early/speculative results to keep a
+ * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct
+ * results, e.g. for 'team prizes'. We're now outputing window results as they're
+ * calculated, giving us much lower latency than with the previous batch examples.
+ *
+ * <p> Run {@link injector.Injector} to generate pubsub data for this pipeline.  The Injector
+ * documentation provides more detail on how to do this.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ * The PubSub topic you specify should be the same topic to which the Injector is publishing.
+ */
+public class LeaderBoard extends HourlyTeamScore {
+
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+  private static DateTimeFormatter fmt =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+  static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+  static final Duration TEN_MINUTES = Duration.standardMinutes(10);
+
+
+  /**
+   * Options supported by {@link LeaderBoard}.
+   */
+  static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions {
+
+    @Description("Pub/Sub topic to read from")
+    @Validation.Required
+    String getTopic();
+    void setTopic(String value);
+
+    @Description("Numeric value of fixed window duration for team analysis, in minutes")
+    @Default.Integer(60)
+    Integer getTeamWindowDuration();
+    void setTeamWindowDuration(Integer value);
+
+    @Description("Numeric value of allowed data lateness, in minutes")
+    @Default.Integer(120)
+    Integer getAllowedLateness();
+    void setAllowedLateness(Integer value);
+
+    @Description("Prefix used for the BigQuery table names")
+    @Default.String("leaderboard")
+    String getTableName();
+    void setTableName(String value);
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write team score sums and includes event timing information.
+   */
+  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureWindowedTableWrite() {
+
+    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put("team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+            c -> c.element().getKey()));
+    tableConfigure.put("total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+            c -> c.element().getValue()));
+    tableConfigure.put("window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+          c -> { IntervalWindow w = (IntervalWindow) c.window();
+                 return fmt.print(w.start()); }));
+    tableConfigure.put("processing_time",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> fmt.print(Instant.now())));
+    tableConfigure.put("timing",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> c.pane().getTiming().toString()));
+    return tableConfigure;
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is used to write user score sums.
+   */
+  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+      configureGlobalWindowBigQueryWrite() {
+
+    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        configureBigQueryWrite();
+    tableConfigure.put("processing_time",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", c -> fmt.print(Instant.now())));
+    return tableConfigure;
+  }
+
+
+  public static void main(String[] args) throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    // Enforce that this pipeline is always run in streaming mode.
+    options.setStreaming(true);
+    // For example purposes, allow the pipeline to be easily cancelled instead of running
+    // continuously.
+    options.setRunner(DataflowPipelineRunner.class);
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    Pipeline pipeline = Pipeline.create(options);
+
+    // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
+    // data elements, and parse the data.
+    PCollection<GameActionInfo> gameEvents = pipeline
+        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+
+    // [START DocInclude_WindowAndTrigger]
+    // Extract team/score pairs from the event stream, using hour-long windows by default.
+    gameEvents
+        .apply(Window.named("LeaderboardTeamFixedWindows")
+          .<GameActionInfo>into(FixedWindows.of(
+              Duration.standardMinutes(options.getTeamWindowDuration())))
+          // We will get early (speculative) results as well as cumulative
+          // processing of late data.
+          .triggering(
+            AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(FIVE_MINUTES))
+            .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(TEN_MINUTES)))
+          .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
+          .accumulatingFiredPanes())
+        // Extract and sum teamname/score pairs from the event data.
+        .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+        // Write the results to BigQuery.
+        .apply("WriteTeamScoreSums",
+               new WriteWindowedToBigQuery<KV<String, Integer>>(
+                  options.getTableName() + "_team", configureWindowedTableWrite()));
+    // [END DocInclude_WindowAndTrigger]
+
+    // [START DocInclude_ProcTimeTrigger]
+    // Extract user/score pairs from the event stream using processing time, via global windowing.
+    // Get periodic updates on all users' running scores.
+    gameEvents
+        .apply(Window.named("LeaderboardUserGlobalWindow")
+          .<GameActionInfo>into(new GlobalWindows())
+          // Get periodic results every ten minutes.
+              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+                  .plusDelayOf(TEN_MINUTES)))
+              .accumulatingFiredPanes()
+              .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())))
+        // Extract and sum username/score pairs from the event data.
+        .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+        // Write the results to BigQuery.
+        .apply("WriteUserScoreSums",
+               new WriteToBigQuery<KV<String, Integer>>(
+                  options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
+    // [END DocInclude_ProcTimeTrigger]
+
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = pipeline.run();
+    dataflowUtils.waitToFinish(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md
new file mode 100644
index 0000000..79b55ce
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md
@@ -0,0 +1,113 @@
+
+# 'Gaming' examples
+
+
+This directory holds a series of example Dataflow pipelines in a simple 'mobile
+gaming' domain. They all require Java 8.  Each pipeline successively introduces
+new concepts, and gives some examples of using Java 8 syntax in constructing
+Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts
+that are used apply equally well in Java 7.
+
+In the gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period.
+
+The scenario includes not only "regular" users, but "robot users", which have a
+higher click rate than the regular users, and may move from team to team.
+
+The first two pipelines in the series use pre-generated batch data samples. The
+second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/)
+topic input.  For these examples, you will also need to run the
+`injector.Injector` program, which generates and publishes the gaming data to
+PubSub. The javadocs for each pipeline have more detailed information on how to
+run that pipeline.
+
+All of these pipelines write their results to BigQuery table(s).
+
+
+## The pipelines in the 'gaming' series
+
+### UserScore
+
+The first pipeline in the series is `UserScore`. This pipeline does batch
+processing of data collected from gaming events. It calculates the sum of
+scores per user, over an entire batch of gaming data (collected, say, for each
+day). The batch processing will not include any late data that arrives after
+the day's cutoff point.
+
+### HourlyTeamScore
+
+The next pipeline in the series is `HourlyTeamScore`. This pipeline also
+processes data collected from gaming events in batch. It builds on `UserScore`,
+but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by
+default an hour in duration. It calculates the sum of scores per team, for each
+window, optionally allowing specification of two timestamps before and after
+which data is filtered out. This allows a model where late data collected after
+the intended analysis window can be included in the analysis, and any late-
+arriving data prior to the beginning of the analysis window can be removed as
+well.
+
+By using windowing and adding element timestamps, we can do finer-grained
+analysis than with the `UserScore` pipeline — we're now tracking scores for
+each hour rather than over the course of a whole day. However, our batch
+processing is high-latency, in that we don't get results from plays at the
+beginning of the batch's time period until the complete batch is processed.
+
+### LeaderBoard
+
+The third pipeline in the series is `LeaderBoard`. This pipeline processes an
+unbounded stream of 'game events' from a PubSub topic. The calculation of the
+team scores uses fixed windowing based on event time (the time of the game play
+event), not processing time (the time that an event is processed by the
+pipeline). The pipeline calculates the sum of scores per team, for each window.
+By default, the team scores are calculated using one-hour windows.
+
+In contrast — to demo another windowing option — the user scores are calculated
+using a global window, which periodically (every ten minutes) emits cumulative
+user score sums.
+
+In contrast to the previous pipelines in the series, which used static, finite
+input data, here we're using an unbounded data source, which lets us provide
+_speculative_ results, and allows handling of late data, at much lower latency.
+E.g., we could use the early/speculative results to keep a 'leaderboard'
+updated in near-realtime. Our handling of late data lets us generate correct
+results, e.g. for 'team prizes'. We're now outputing window results as they're
+calculated, giving us much lower latency than with the previous batch examples.
+
+### GameStats
+
+The fourth pipeline in the series is `GameStats`. This pipeline builds
+on the `LeaderBoard` functionality — supporting output of speculative and late
+data — and adds some "business intelligence" analysis: identifying abuse
+detection. The pipeline derives the Mean user score sum for a window, and uses
+that information to identify likely spammers/robots. (The injector is designed
+so that the "robots" have a higher click rate than the "real" users). The robot
+users are then filtered out when calculating the team scores.
+
+Additionally, user sessions are tracked: that is, we find bursts of user
+activity using session windows. Then, the mean session duration information is
+recorded in the context of subsequent fixed windowing. (This could be used to
+tell us what games are giving us greater user retention).
+
+### Running the PubSub Injector
+
+The `LeaderBoard` and `GameStats` example pipelines read unbounded data
+from a PubSub topic.
+
+Use the `injector.Injector` program to generate this data and publish to a
+PubSub topic. See the `Injector`javadocs for more information on how to run the
+injector. Set up the injector before you start one of these pipelines. Then,
+when you start the pipeline, pass as an argument the name of that PubSub topic.
+See the pipeline javadocs for the details.
+
+## Viewing the results in BigQuery
+
+All of the pipelines write their results to BigQuery.  `UserScore` and
+`HourlyTeamScore` each write one table, and `LeaderBoard` and
+`GameStats` each write two. The pipelines have default table names that
+you can override when you start up the pipeline if those tables already exist.
+
+Depending on the windowing intervals defined in a given pipeline, you may have
+to wait for a while (more than an hour) before you start to see results written
+to the BigQuery tables.


Mime
View raw message