Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3BFC01968E for ; Thu, 14 Apr 2016 04:48:21 +0000 (UTC) Received: (qmail 51000 invoked by uid 500); 14 Apr 2016 04:48:21 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 50946 invoked by uid 500); 14 Apr 2016 04:48:21 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 50885 invoked by uid 99); 14 Apr 2016 04:48:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:48:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 3C064C6B3D for ; Thu, 14 Apr 2016 04:48:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.216 X-Spam-Level: X-Spam-Status: No, score=-4.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id rvYv7NoqZL1u for ; Thu, 14 Apr 2016 04:48:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 848A460E78 for ; Thu, 14 Apr 2016 04:47:50 +0000 (UTC) Received: (qmail 47673 invoked by uid 99); 14 Apr 2016 04:47:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:47:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3E036E78B2; Thu, 14 Apr 2016 04:47:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Thu, 14 Apr 2016 04:48:27 -0000 Message-Id: <609d950186294f36b8b1baa4fc77e51d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [40/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java deleted file mode 100644 index 18ff0a3..0000000 --- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java +++ /dev/null @@ -1,416 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete.game.injector; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.common.collect.ImmutableMap; - -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - -import java.io.BufferedOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; -import java.util.TimeZone; - - -/** - * This is a generator that simulates usage data from a mobile game, and either publishes the data - * to a pubsub topic or writes it to a file. - * - *

The general model used by the generator is the following. There is a set of teams with team - * members. Each member is scoring points for their team. After some period, a team will dissolve - * and a new one will be created in its place. There is also a set of 'Robots', or spammer users. - * They hop from team to team. The robots are set to have a higher 'click rate' (generate more - * events) than the regular team members. - * - *

Each generated line of data has the following form: - * username,teamname,score,timestamp_in_ms,readable_time - * e.g.: - * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 - * - *

The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if - * specified. It takes the following arguments: - * {@code Injector project-name (topic-name|none) (filename|none)}. - * - *

To run the Injector in the mode where it publishes to PubSub, you will need to authenticate - * locally using project-based service account credentials to avoid running over PubSub - * quota. - * See https://developers.google.com/identity/protocols/application-default-credentials - * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS - * environment variable to point to your downloaded service account credentials before starting the - * program, e.g.: - * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}. - * If you do not do this, then your injector will only run for a few minutes on your - * 'user account' credentials before you will start to see quota error messages like: - * "Request throttled due to user QPS limit being reached", and see this exception: - * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests". - * Once you've set up your credentials, run the Injector like this": - *

{@code
- * Injector   none
- * }
- * 
- * The pubsub topic will be created if it does not exist. - * - *

To run the injector in write-to-file-mode, set the topic name to "none" and specify the - * filename: - *

{@code
- * Injector  none 
- * }
- * 
- */ -class Injector { - private static Pubsub pubsub; - private static Random random = new Random(); - private static String topic; - private static String project; - private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; - - // QPS ranges from 800 to 1000. - private static final int MIN_QPS = 800; - private static final int QPS_RANGE = 200; - // How long to sleep, in ms, between creation of the threads that make API requests to PubSub. - private static final int THREAD_SLEEP_MS = 500; - - // Lists used to generate random team names. - private static final ArrayList COLORS = - new ArrayList(Arrays.asList( - "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber", - "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen", - "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana", - "Beige", "Bisque", "BarnRed", "BattleshipGrey")); - - private static final ArrayList ANIMALS = - new ArrayList(Arrays.asList( - "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu", - "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus", - "Bandicoot", "Cockatoo", "Antechinus")); - - // The list of live teams. - private static ArrayList liveTeams = new ArrayList(); - - private static DateTimeFormatter fmt = - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); - - - // The total number of robots in the system. - private static final int NUM_ROBOTS = 20; - // Determines the chance that a team will have a robot team member. - private static final int ROBOT_PROBABILITY = 3; - private static final int NUM_LIVE_TEAMS = 15; - private static final int BASE_MEMBERS_PER_TEAM = 5; - private static final int MEMBERS_PER_TEAM = 15; - private static final int MAX_SCORE = 20; - private static final int LATE_DATA_RATE = 5 * 60 * 2; // Every 10 minutes - private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000; // 5-10 minute delay - private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000; - - // The minimum time a 'team' can live. - private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20; - private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20; - - - /** - * A class for holding team info: the name of the team, when it started, - * and the current team members. Teams may but need not include one robot team member. - */ - private static class TeamInfo { - String teamName; - long startTimeInMillis; - int expirationPeriod; - // The team might but need not include 1 robot. Will be non-null if so. - String robot; - int numMembers; - - private TeamInfo(String teamName, long startTimeInMillis, String robot) { - this.teamName = teamName; - this.startTimeInMillis = startTimeInMillis; - // How long until this team is dissolved. - this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) + - BASE_TEAM_EXPIRATION_TIME_IN_MINS; - this.robot = robot; - // Determine the number of team members. - numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM; - } - - String getTeamName() { - return teamName; - } - String getRobot() { - return robot; - } - - long getStartTimeInMillis() { - return startTimeInMillis; - } - long getEndTimeInMillis() { - return startTimeInMillis + (expirationPeriod * 60 * 1000); - } - String getRandomUser() { - int userNum = random.nextInt(numMembers); - return "user" + userNum + "_" + teamName; - } - - int numMembers() { - return numMembers; - } - - @Override - public String toString() { - return "(" + teamName + ", num members: " + numMembers() + ", starting at: " - + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")"; - } - } - - /** Utility to grab a random element from an array of Strings. */ - private static String randomElement(ArrayList list) { - int index = random.nextInt(list.size()); - return list.get(index); - } - - /** - * Get and return a random team. If the selected team is too old w.r.t its expiration, remove - * it, replacing it with a new team. - */ - private static TeamInfo randomTeam(ArrayList list) { - int index = random.nextInt(list.size()); - TeamInfo team = list.get(index); - // If the selected team is expired, remove it and return a new team. - long currTime = System.currentTimeMillis(); - if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) { - System.out.println("\nteam " + team + " is too old; replacing."); - System.out.println("start time: " + team.getStartTimeInMillis() + - ", end time: " + team.getEndTimeInMillis() + - ", current time:" + currTime); - removeTeam(index); - // Add a new team in its stead. - return (addLiveTeam()); - } else { - return team; - } - } - - /** - * Create and add a team. Possibly add a robot to the team. - */ - private static synchronized TeamInfo addLiveTeam() { - String teamName = randomElement(COLORS) + randomElement(ANIMALS); - String robot = null; - // Decide if we want to add a robot to the team. - if (random.nextInt(ROBOT_PROBABILITY) == 0) { - robot = "Robot-" + random.nextInt(NUM_ROBOTS); - } - // Create the new team. - TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot); - liveTeams.add(newTeam); - System.out.println("[+" + newTeam + "]"); - return newTeam; - } - - /** - * Remove a specific team. - */ - private static synchronized void removeTeam(int teamIndex) { - TeamInfo removedTeam = liveTeams.remove(teamIndex); - System.out.println("[-" + removedTeam + "]"); - } - - /** Generate a user gaming event. */ - private static String generateEvent(Long currTime, int delayInMillis) { - TeamInfo team = randomTeam(liveTeams); - String teamName = team.getTeamName(); - String user; - final int parseErrorRate = 900000; - - String robot = team.getRobot(); - // If the team has an associated robot team member... - if (robot != null) { - // Then use that robot for the message with some probability. - // Set this probability to higher than that used to select any of the 'regular' team - // members, so that if there is a robot on the team, it has a higher click rate. - if (random.nextInt(team.numMembers() / 2) == 0) { - user = robot; - } else { - user = team.getRandomUser(); - } - } else { // No robot. - user = team.getRandomUser(); - } - String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE); - // Randomly introduce occasional parse errors. You can see a custom counter tracking the number - // of such errors in the Dataflow Monitoring UI, as the example pipeline runs. - if (random.nextInt(parseErrorRate) == 0) { - System.out.println("Introducing a parse error."); - event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR"; - } - return addTimeInfoToEvent(event, currTime, delayInMillis); - } - - /** - * Add time info to a generated gaming event. - */ - private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) { - String eventTimeString = - Long.toString((currTime - delayInMillis) / 1000 * 1000); - // Add a (redundant) 'human-readable' date string to make the data semantics more clear. - String dateString = fmt.print(currTime); - message = message + "," + eventTimeString + "," + dateString; - return message; - } - - /** - * Publish 'numMessages' arbitrary events from live users with the provided delay, to a - * PubSub topic. - */ - public static void publishData(int numMessages, int delayInMillis) - throws IOException { - List pubsubMessages = new ArrayList<>(); - - for (int i = 0; i < Math.max(1, numMessages); i++) { - Long currTime = System.currentTimeMillis(); - String message = generateEvent(currTime, delayInMillis); - PubsubMessage pubsubMessage = new PubsubMessage() - .encodeData(message.getBytes("UTF-8")); - pubsubMessage.setAttributes( - ImmutableMap.of(TIMESTAMP_ATTRIBUTE, - Long.toString((currTime - delayInMillis) / 1000 * 1000))); - if (delayInMillis != 0) { - System.out.println(pubsubMessage.getAttributes()); - System.out.println("late data for: " + message); - } - pubsubMessages.add(pubsubMessage); - } - - PublishRequest publishRequest = new PublishRequest(); - publishRequest.setMessages(pubsubMessages); - pubsub.projects().topics().publish(topic, publishRequest).execute(); - } - - /** - * Publish generated events to a file. - */ - public static void publishDataToFile(String fileName, int numMessages, int delayInMillis) - throws IOException { - PrintWriter out = new PrintWriter(new OutputStreamWriter( - new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8")); - - try { - for (int i = 0; i < Math.max(1, numMessages); i++) { - Long currTime = System.currentTimeMillis(); - String message = generateEvent(currTime, delayInMillis); - out.println(message); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (out != null) { - out.flush(); - out.close(); - } - } - } - - - public static void main(String[] args) throws IOException, InterruptedException { - if (args.length < 3) { - System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)"); - System.exit(1); - } - boolean writeToFile = false; - boolean writeToPubsub = true; - project = args[0]; - String topicName = args[1]; - String fileName = args[2]; - // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if - // specified; otherwise, it will try to write to a file. - if (topicName.equalsIgnoreCase("none")) { - writeToFile = true; - writeToPubsub = false; - } - if (writeToPubsub) { - // Create the PubSub client. - pubsub = InjectorUtils.getClient(); - // Create the PubSub topic as necessary. - topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName); - InjectorUtils.createTopic(pubsub, topic); - System.out.println("Injecting to topic: " + topic); - } else { - if (fileName.equalsIgnoreCase("none")) { - System.out.println("Filename not specified."); - System.exit(1); - } - System.out.println("Writing to file: " + fileName); - } - System.out.println("Starting Injector"); - - // Start off with some random live teams. - while (liveTeams.size() < NUM_LIVE_TEAMS) { - addLiveTeam(); - } - - // Publish messages at a rate determined by the QPS and Thread sleep settings. - for (int i = 0; true; i++) { - if (Thread.activeCount() > 10) { - System.err.println("I'm falling behind!"); - } - - // Decide if this should be a batch of late data. - final int numMessages; - final int delayInMillis; - if (i % LATE_DATA_RATE == 0) { - // Insert delayed data for one user (one message only) - delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS); - numMessages = 1; - System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")"); - } else { - System.out.print("."); - delayInMillis = 0; - numMessages = MIN_QPS + random.nextInt(QPS_RANGE); - } - - if (writeToFile) { // Won't use threading for the file write. - publishDataToFile(fileName, numMessages, delayInMillis); - } else { // Write to PubSub. - // Start a thread to inject some data. - new Thread(){ - @Override - public void run() { - try { - publishData(numMessages, delayInMillis); - } catch (IOException e) { - System.err.println(e); - } - } - }.start(); - } - - // Wait before creating another injector thread. - Thread.sleep(THREAD_SLEEP_MS); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java deleted file mode 100644 index db58650..0000000 --- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete.game.injector; - - -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.util.Utils; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.PubsubScopes; -import com.google.api.services.pubsub.model.Topic; - -import com.google.common.base.Preconditions; - -import java.io.IOException; - -class InjectorUtils { - - private static final String APP_NAME = "injector"; - - /** - * Builds a new Pubsub client and returns it. - */ - public static Pubsub getClient(final HttpTransport httpTransport, - final JsonFactory jsonFactory) - throws IOException { - Preconditions.checkNotNull(httpTransport); - Preconditions.checkNotNull(jsonFactory); - GoogleCredential credential = - GoogleCredential.getApplicationDefault(httpTransport, jsonFactory); - if (credential.createScopedRequired()) { - credential = credential.createScoped(PubsubScopes.all()); - } - if (credential.getClientAuthentication() != null) { - System.out.println("\n***Warning! You are not using service account credentials to " - + "authenticate.\nYou need to use service account credentials for this example," - + "\nsince user-level credentials do not have enough pubsub quota,\nand so you will run " - + "out of PubSub quota very quickly.\nSee " - + "https://developers.google.com/identity/protocols/application-default-credentials."); - System.exit(1); - } - HttpRequestInitializer initializer = - new RetryHttpInitializerWrapper(credential); - return new Pubsub.Builder(httpTransport, jsonFactory, initializer) - .setApplicationName(APP_NAME) - .build(); - } - - /** - * Builds a new Pubsub client with default HttpTransport and - * JsonFactory and returns it. - */ - public static Pubsub getClient() throws IOException { - return getClient(Utils.getDefaultTransport(), - Utils.getDefaultJsonFactory()); - } - - - /** - * Returns the fully qualified topic name for Pub/Sub. - */ - public static String getFullyQualifiedTopicName( - final String project, final String topic) { - return String.format("projects/%s/topics/%s", project, topic); - } - - /** - * Create a topic if it doesn't exist. - */ - public static void createTopic(Pubsub client, String fullTopicName) - throws IOException { - try { - client.projects().topics().get(fullTopicName).execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { - Topic topic = client.projects().topics() - .create(fullTopicName, new Topic()) - .execute(); - System.out.printf("Topic %s was created.\n", topic.getName()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java deleted file mode 100644 index 9d58837..0000000 --- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete.game.injector; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.http.HttpBackOffIOExceptionHandler; -import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpUnsuccessfulResponseHandler; -import com.google.api.client.util.ExponentialBackOff; -import com.google.api.client.util.Sleeper; -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.util.logging.Logger; - -/** - * RetryHttpInitializerWrapper will automatically retry upon RPC - * failures, preserving the auto-refresh behavior of the Google - * Credentials. - */ -public class RetryHttpInitializerWrapper implements HttpRequestInitializer { - - /** - * A private logger. - */ - private static final Logger LOG = - Logger.getLogger(RetryHttpInitializerWrapper.class.getName()); - - /** - * One minutes in miliseconds. - */ - private static final int ONEMINITUES = 60000; - - /** - * Intercepts the request for filling in the "Authorization" - * header field, as well as recovering from certain unsuccessful - * error codes wherein the Credential must refresh its token for a - * retry. - */ - private final Credential wrappedCredential; - - /** - * A sleeper; you can replace it with a mock in your test. - */ - private final Sleeper sleeper; - - /** - * A constructor. - * - * @param wrappedCredential Credential which will be wrapped and - * used for providing auth header. - */ - public RetryHttpInitializerWrapper(final Credential wrappedCredential) { - this(wrappedCredential, Sleeper.DEFAULT); - } - - /** - * A protected constructor only for testing. - * - * @param wrappedCredential Credential which will be wrapped and - * used for providing auth header. - * @param sleeper Sleeper for easy testing. - */ - RetryHttpInitializerWrapper( - final Credential wrappedCredential, final Sleeper sleeper) { - this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential); - this.sleeper = sleeper; - } - - /** - * Initializes the given request. - */ - @Override - public final void initialize(final HttpRequest request) { - request.setReadTimeout(2 * ONEMINITUES); // 2 minutes read timeout - final HttpUnsuccessfulResponseHandler backoffHandler = - new HttpBackOffUnsuccessfulResponseHandler( - new ExponentialBackOff()) - .setSleeper(sleeper); - request.setInterceptor(wrappedCredential); - request.setUnsuccessfulResponseHandler( - new HttpUnsuccessfulResponseHandler() { - @Override - public boolean handleResponse( - final HttpRequest request, - final HttpResponse response, - final boolean supportsRetry) throws IOException { - if (wrappedCredential.handleResponse( - request, response, supportsRetry)) { - // If credential decides it can handle it, - // the return code or message indicated - // something specific to authentication, - // and no backoff is desired. - return true; - } else if (backoffHandler.handleResponse( - request, response, supportsRetry)) { - // Otherwise, we defer to the judgement of - // our internal backoff handler. - LOG.info("Retrying " - + request.getUrl().toString()); - return true; - } else { - return false; - } - } - }); - request.setIOExceptionHandler( - new HttpBackOffIOExceptionHandler(new ExponentialBackOff()) - .setSleeper(sleeper)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java deleted file mode 100644 index 4bcfb72..0000000 --- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java +++ /dev/null @@ -1,135 +0,0 @@ - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete.game.utils; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.examples.complete.game.UserScore; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; -import com.google.cloud.dataflow.sdk.options.GcpOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Generate, format, and write BigQuery table row information. Use provided information about - * the field names and types, as well as lambda functions that describe how to generate their - * values. - */ -public class WriteToBigQuery - extends PTransform, PDone> { - - protected String tableName; - protected Map> fieldInfo; - - public WriteToBigQuery() { - } - - public WriteToBigQuery(String tableName, - Map> fieldInfo) { - this.tableName = tableName; - this.fieldInfo = fieldInfo; - } - - /** Define a class to hold information about output table field definitions. */ - public static class FieldInfo implements Serializable { - // The BigQuery 'type' of the field - private String fieldType; - // A lambda function to generate the field value - private SerializableFunction.ProcessContext, Object> fieldFn; - - public FieldInfo(String fieldType, - SerializableFunction.ProcessContext, Object> fieldFn) { - this.fieldType = fieldType; - this.fieldFn = fieldFn; - } - - String getFieldType() { - return this.fieldType; - } - - SerializableFunction.ProcessContext, Object> getFieldFn() { - return this.fieldFn; - } - } - /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */ - protected class BuildRowFn extends DoFn { - - @Override - public void processElement(ProcessContext c) { - - TableRow row = new TableRow(); - for (Map.Entry> entry : fieldInfo.entrySet()) { - String key = entry.getKey(); - FieldInfo fcnInfo = entry.getValue(); - SerializableFunction.ProcessContext, Object> fcn = - fcnInfo.getFieldFn(); - row.set(key, fcn.apply(c)); - } - c.output(row); - } - } - - /** Build the output table schema. */ - protected TableSchema getSchema() { - List fields = new ArrayList<>(); - for (Map.Entry> entry : fieldInfo.entrySet()) { - String key = entry.getKey(); - FieldInfo fcnInfo = entry.getValue(); - String bqType = fcnInfo.getFieldType(); - fields.add(new TableFieldSchema().setName(key).setType(bqType)); - } - return new TableSchema().setFields(fields); - } - - @Override - public PDone apply(PCollection teamAndScore) { - return teamAndScore - .apply(ParDo.named("ConvertToRow").of(new BuildRowFn())) - .apply(BigQueryIO.Write - .to(getTable(teamAndScore.getPipeline(), - tableName)) - .withSchema(getSchema()) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); - } - - /** Utility to construct an output table reference. */ - static TableReference getTable(Pipeline pipeline, String tableName) { - PipelineOptions options = pipeline.getOptions(); - TableReference table = new TableReference(); - table.setDatasetId(options.as(UserScore.Options.class).getDataset()); - table.setProjectId(options.as(GcpOptions.class).getProject()); - table.setTableId(tableName); - return table; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java deleted file mode 100644 index 41257ca..0000000 --- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ /dev/null @@ -1,77 +0,0 @@ - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete.game.utils; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; - -import java.util.Map; - -/** - * Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery} - * to require windowing; so this subclass may be used for writes that require access to the - * context's window information. - */ -public class WriteWindowedToBigQuery - extends WriteToBigQuery { - - public WriteWindowedToBigQuery(String tableName, - Map> fieldInfo) { - super(tableName, fieldInfo); - } - - /** Convert each key/score pair into a BigQuery TableRow. */ - protected class BuildRowFn extends DoFn - implements RequiresWindowAccess { - - @Override - public void processElement(ProcessContext c) { - - TableRow row = new TableRow(); - for (Map.Entry> entry : fieldInfo.entrySet()) { - String key = entry.getKey(); - FieldInfo fcnInfo = entry.getValue(); - SerializableFunction.ProcessContext, Object> fcn = - fcnInfo.getFieldFn(); - row.set(key, fcn.apply(c)); - } - c.output(row); - } - } - - @Override - public PDone apply(PCollection teamAndScore) { - return teamAndScore - .apply(ParDo.named("ConvertToRow").of(new BuildRowFn())) - .apply(BigQueryIO.Write - .to(getTable(teamAndScore.getPipeline(), - tableName)) - .withSchema(getSchema()) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java new file mode 100644 index 0000000..8705ed3 --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Filter; +import com.google.cloud.dataflow.sdk.transforms.FlatMapElements; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import java.util.Arrays; + +/** + * An example that counts words in Shakespeare, using Java 8 language features. + * + *

See {@link MinimalWordCount} for a comprehensive explanation. + */ +public class MinimalWordCountJava8 { + + public static void main(String[] args) { + DataflowPipelineOptions options = PipelineOptionsFactory.create() + .as(DataflowPipelineOptions.class); + + options.setRunner(BlockingDataflowPipelineRunner.class); + + // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud. + options.setProject("SET_YOUR_PROJECT_ID_HERE"); + + // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files. + options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); + + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) + .withOutputType(new TypeDescriptor() {})) + .apply(Filter.byPredicate((String word) -> !word.isEmpty())) + .apply(Count.perElement()) + .apply(MapElements + .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) + .withOutputType(new TypeDescriptor() {})) + + // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to. + .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java new file mode 100644 index 0000000..89832b2 --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; +import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.io.PubsubIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.transforms.Mean; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * This class is the fourth in a series of four pipelines that tell a story in a 'gaming' + * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}. + * New concepts: session windows and finding session duration; use of both + * singleton and non-singleton side inputs. + * + *

This pipeline builds on the {@link LeaderBoard} functionality, and adds some "business + * intelligence" analysis: abuse detection and usage patterns. The pipeline derives the Mean user + * score sum for a window, and uses that information to identify likely spammers/robots. (The robots + * have a higher click rate than the human users). The 'robot' users are then filtered out when + * calculating the team scores. + * + *

Additionally, user sessions are tracked: that is, we find bursts of user activity using + * session windows. Then, the mean session duration information is recorded in the context of + * subsequent fixed windowing. (This could be used to tell us what games are giving us greater + * user retention). + * + *

Run {@code com.google.cloud.dataflow.examples.complete.game.injector.Injector} to generate + * pubsub data for this pipeline. The {@code Injector} documentation provides more detail. + * + *

To execute this pipeline using the Dataflow service, specify the pipeline configuration + * like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. The PubSub topic you specify should + * be the same topic to which the Injector is publishing. + */ +public class GameStats extends LeaderBoard { + + private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + + /** + * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs. + * We do this by finding the mean total score per user, then using that information as a side + * input to filter out all but those user scores that are > (mean * SCORE_WEIGHT) + */ + // [START DocInclude_AbuseDetect] + public static class CalculateSpammyUsers + extends PTransform>, PCollection>> { + private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class); + private static final double SCORE_WEIGHT = 2.5; + + @Override + public PCollection> apply(PCollection> userScores) { + + // Get the sum of scores for each user. + PCollection> sumScores = userScores + .apply("UserSum", Sum.integersPerKey()); + + // Extract the score from each element, and use it to find the global mean. + final PCollectionView globalMeanScore = sumScores.apply(Values.create()) + .apply(Mean.globally().asSingletonView()); + + // Filter the user sums using the global mean. + PCollection> filtered = sumScores + .apply(ParDo + .named("ProcessAndFilter") + // use the derived mean total score as a side input + .withSideInputs(globalMeanScore) + .of(new DoFn, KV>() { + private final Aggregator numSpammerUsers = + createAggregator("SpammerUsers", new Sum.SumLongFn()); + @Override + public void processElement(ProcessContext c) { + Integer score = c.element().getValue(); + Double gmc = c.sideInput(globalMeanScore); + if (score > (gmc * SCORE_WEIGHT)) { + LOG.info("user " + c.element().getKey() + " spammer score " + score + + " with mean " + gmc); + numSpammerUsers.addValue(1L); + c.output(c.element()); + } + } + })); + return filtered; + } + } + // [END DocInclude_AbuseDetect] + + /** + * Calculate and output an element's session duration. + */ + private static class UserSessionInfoFn extends DoFn, Integer> + implements RequiresWindowAccess { + + @Override + public void processElement(ProcessContext c) { + IntervalWindow w = (IntervalWindow) c.window(); + int duration = new Duration( + w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes(); + c.output(duration); + } + } + + + /** + * Options supported by {@link GameStats}. + */ + static interface Options extends LeaderBoard.Options { + @Description("Numeric value of fixed window duration for user analysis, in minutes") + @Default.Integer(60) + Integer getFixedWindowDuration(); + void setFixedWindowDuration(Integer value); + + @Description("Numeric value of gap between user sessions, in minutes") + @Default.Integer(5) + Integer getSessionGap(); + void setSessionGap(Integer value); + + @Description("Numeric value of fixed window for finding mean of user session duration, " + + "in minutes") + @Default.Integer(30) + Integer getUserActivityWindowDuration(); + void setUserActivityWindowDuration(Integer value); + + @Description("Prefix used for the BigQuery table names") + @Default.String("game_stats") + String getTablePrefix(); + void setTablePrefix(String value); + } + + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write information about team score sums. + */ + protected static Map>> + configureWindowedWrite() { + Map>> tableConfigure = + new HashMap>>(); + tableConfigure.put("team", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> c.element().getKey())); + tableConfigure.put("total_score", + new WriteWindowedToBigQuery.FieldInfo>("INTEGER", + c -> c.element().getValue())); + tableConfigure.put("window_start", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + tableConfigure.put("processing_time", + new WriteWindowedToBigQuery.FieldInfo>( + "STRING", c -> fmt.print(Instant.now()))); + return tableConfigure; + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write information about mean user session time. + */ + protected static Map> + configureSessionWindowWrite() { + + Map> tableConfigure = + new HashMap>(); + tableConfigure.put("window_start", + new WriteWindowedToBigQuery.FieldInfo("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + tableConfigure.put("mean_duration", + new WriteWindowedToBigQuery.FieldInfo("FLOAT", c -> c.element())); + return tableConfigure; + } + + + + public static void main(String[] args) throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // Enforce that this pipeline is always run in streaming mode. + options.setStreaming(true); + // Allow the pipeline to be cancelled automatically. + options.setRunner(DataflowPipelineRunner.class); + DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + Pipeline pipeline = Pipeline.create(options); + + // Read Events from Pub/Sub using custom timestamps + PCollection rawEvents = pipeline + .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); + + // Extract username/score pairs from the event stream + PCollection> userEvents = + rawEvents.apply("ExtractUserScore", + MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) + .withOutputType(new TypeDescriptor>() {})); + + // Calculate the total score per user over fixed windows, and + // cumulative updates for late data. + final PCollectionView> spammersView = userEvents + .apply(Window.named("FixedWindowsUser") + .>into(FixedWindows.of( + Duration.standardMinutes(options.getFixedWindowDuration()))) + ) + + // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate. + // These might be robots/spammers. + .apply("CalculateSpammyUsers", new CalculateSpammyUsers()) + // Derive a view from the collection of spammer users. It will be used as a side input + // in calculating the team score sums, below. + .apply("CreateSpammersView", View.asMap()); + + // [START DocInclude_FilterAndCalc] + // Calculate the total score per team over fixed windows, + // and emit cumulative updates for late data. Uses the side input derived above-- the set of + // suspected robots-- to filter out scores from those users from the sum. + // Write the results to BigQuery. + rawEvents + .apply(Window.named("WindowIntoFixedWindows") + .into(FixedWindows.of( + Duration.standardMinutes(options.getFixedWindowDuration()))) + ) + // Filter out the detected spammer users, using the side input derived above. + .apply(ParDo.named("FilterOutSpammers") + .withSideInputs(spammersView) + .of(new DoFn() { + @Override + public void processElement(ProcessContext c) { + // If the user is not in the spammers Map, output the data element. + if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) { + c.output(c.element()); + } + } + })) + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + // [END DocInclude_FilterAndCalc] + // Write the result to BigQuery + .apply("WriteTeamSums", + new WriteWindowedToBigQuery>( + options.getTablePrefix() + "_team", configureWindowedWrite())); + + + // [START DocInclude_SessionCalc] + // Detect user sessions-- that is, a burst of activity separated by a gap from further + // activity. Find and record the mean session lengths. + // This information could help the game designers track the changing user engagement + // as their set of games changes. + userEvents + .apply(Window.named("WindowIntoSessions") + .>into( + Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) + // For this use, we care only about the existence of the session, not any particular + // information aggregated over it, so the following is an efficient way to do that. + .apply(Combine.perKey(x -> 0)) + // Get the duration per session. + .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn())) + // [END DocInclude_SessionCalc] + // [START DocInclude_Rewindow] + // Re-window to process groups of session sums according to when the sessions complete. + .apply(Window.named("WindowToExtractSessionMean") + .into( + FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) + // Find the mean session duration in each window. + .apply(Mean.globally().withoutDefaults()) + // Write this info to a BigQuery table. + .apply("WriteAvgSessionLength", + new WriteWindowedToBigQuery( + options.getTablePrefix() + "_sessions", configureSessionWindowWrite())); + // [END DocInclude_Rewindow] + + + // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the + // command line. + PipelineResult result = pipeline.run(); + dataflowUtils.waitToFinish(result); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java new file mode 100644 index 0000000..58944c5 --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.Filter; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.WithTimestamps; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; + +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * This class is the second in a series of four pipelines that tell a story in a 'gaming' + * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore}, + * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}. + * + *

This pipeline processes data collected from gaming events in batch, building on {@link + * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window, + * optionally allowing specification of two timestamps before and after which data is filtered out. + * This allows a model where late data collected after the intended analysis window can be included, + * and any late-arriving data prior to the beginning of the analysis window can be removed as well. + * By using windowing and adding element timestamps, we can do finer-grained analysis than with the + * {@link UserScore} pipeline. However, our batch processing is high-latency, in that we don't get + * results from plays at the beginning of the batch's time period until the batch is processed. + * + *

To execute this pipeline using the Dataflow service, specify the pipeline configuration + * like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. + * + *

Optionally include {@code --input} to specify the batch input file path. + * To indicate a time after which the data should be filtered out, include the + * {@code --stopMin} arg. E.g., {@code --stopMin=2015-10-18-23-59} indicates that any data + * timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis. + * To indicate a time before which data should be filtered out, include the {@code --startMin} arg. + * If you're using the default input specified in {@link UserScore}, + * "gs://dataflow-samples/game/gaming_data*.csv", then + * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values. + */ +public class HourlyTeamScore extends UserScore { + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + private static DateTimeFormatter minFmt = + DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + + + /** + * Options supported by {@link HourlyTeamScore}. + */ + static interface Options extends UserScore.Options { + + @Description("Numeric value of fixed window duration, in minutes") + @Default.Integer(60) + Integer getWindowDuration(); + void setWindowDuration(Integer value); + + @Description("String representation of the first minute after which to generate results," + + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST." + + "Any input data timestamped prior to that minute won't be included in the sums.") + @Default.String("1970-01-01-00-00") + String getStartMin(); + void setStartMin(String value); + + @Description("String representation of the first minute for which to not generate results," + + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST." + + "Any input data timestamped after that minute won't be included in the sums.") + @Default.String("2100-01-01-00-00") + String getStopMin(); + void setStopMin(String value); + + @Description("The BigQuery table name. Should not already exist.") + @Default.String("hourly_team_score") + String getTableName(); + void setTableName(String value); + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and + * includes information about window start time. + */ + protected static Map>> + configureWindowedTableWrite() { + Map>> tableConfig = + new HashMap>>(); + tableConfig.put("team", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> c.element().getKey())); + tableConfig.put("total_score", + new WriteWindowedToBigQuery.FieldInfo>("INTEGER", + c -> c.element().getValue())); + tableConfig.put("window_start", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + return tableConfig; + } + + + /** + * Run a batch pipeline to do windowed analysis of the data. + */ + // [START DocInclude_HTSMain] + public static void main(String[] args) throws Exception { + // Begin constructing a pipeline configured by commandline flags. + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = Pipeline.create(options); + + final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin())); + final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin())); + + // Read 'gaming' events from a text file. + pipeline.apply(TextIO.Read.from(options.getInput())) + // Parse the incoming data. + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) + + // Filter out data before and after the given times so that it is not included + // in the calculations. As we collect data in batches (say, by day), the batch for the day + // that we want to analyze could potentially include some late-arriving data from the previous + // day. If so, we want to weed it out. Similarly, if we include data from the following day + // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events + // that fall after the time period we want to analyze. + // [START DocInclude_HTSFilters] + .apply("FilterStartTime", Filter.byPredicate( + (GameActionInfo gInfo) + -> gInfo.getTimestamp() > startMinTimestamp.getMillis())) + .apply("FilterEndTime", Filter.byPredicate( + (GameActionInfo gInfo) + -> gInfo.getTimestamp() < stopMinTimestamp.getMillis())) + // [END DocInclude_HTSFilters] + + // [START DocInclude_HTSAddTsAndWindow] + // Add an element timestamp based on the event log, and apply fixed windowing. + .apply("AddEventTimestamps", + WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp()))) + .apply(Window.named("FixedWindowsTeam") + .into(FixedWindows.of( + Duration.standardMinutes(options.getWindowDuration())))) + // [END DocInclude_HTSAddTsAndWindow] + + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + .apply("WriteTeamScoreSums", + new WriteWindowedToBigQuery>(options.getTableName(), + configureWindowedTableWrite())); + + + pipeline.run(); + } + // [END DocInclude_HTSMain] + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java new file mode 100644 index 0000000..cedd696 --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.complete.game; + +import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; +import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; +import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery; +import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.io.PubsubIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +/** + * This class is the third in a series of four pipelines that tell a story in a 'gaming' domain, + * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded + * data using fixed windows; use of custom timestamps and event-time processing; generation of + * early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late- + * arriving data. + * + *

This pipeline processes an unbounded stream of 'game events'. The calculation of the team + * scores uses fixed windowing based on event time (the time of the game play event), not + * processing time (the time that an event is processed by the pipeline). The pipeline calculates + * the sum of scores per team, for each window. By default, the team scores are calculated using + * one-hour windows. + * + *

In contrast-- to demo another windowing option-- the user scores are calculated using a + * global window, which periodically (every ten minutes) emits cumulative user score sums. + * + *

In contrast to the previous pipelines in the series, which used static, finite input data, + * here we're using an unbounded data source, which lets us provide speculative results, and allows + * handling of late data, at much lower latency. We can use the early/speculative results to keep a + * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct + * results, e.g. for 'team prizes'. We're now outputing window results as they're + * calculated, giving us much lower latency than with the previous batch examples. + * + *

Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector + * documentation provides more detail on how to do this. + * + *

To execute this pipeline using the Dataflow service, specify the pipeline configuration + * like this: + *

{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * 
+ * where the BigQuery dataset you specify must already exist. + * The PubSub topic you specify should be the same topic to which the Injector is publishing. + */ +public class LeaderBoard extends HourlyTeamScore { + + private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; + + private static DateTimeFormatter fmt = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + static final Duration FIVE_MINUTES = Duration.standardMinutes(5); + static final Duration TEN_MINUTES = Duration.standardMinutes(10); + + + /** + * Options supported by {@link LeaderBoard}. + */ + static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions { + + @Description("Pub/Sub topic to read from") + @Validation.Required + String getTopic(); + void setTopic(String value); + + @Description("Numeric value of fixed window duration for team analysis, in minutes") + @Default.Integer(60) + Integer getTeamWindowDuration(); + void setTeamWindowDuration(Integer value); + + @Description("Numeric value of allowed data lateness, in minutes") + @Default.Integer(120) + Integer getAllowedLateness(); + void setAllowedLateness(Integer value); + + @Description("Prefix used for the BigQuery table names") + @Default.String("leaderboard") + String getTableName(); + void setTableName(String value); + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write team score sums and includes event timing information. + */ + protected static Map>> + configureWindowedTableWrite() { + + Map>> tableConfigure = + new HashMap>>(); + tableConfigure.put("team", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> c.element().getKey())); + tableConfigure.put("total_score", + new WriteWindowedToBigQuery.FieldInfo>("INTEGER", + c -> c.element().getValue())); + tableConfigure.put("window_start", + new WriteWindowedToBigQuery.FieldInfo>("STRING", + c -> { IntervalWindow w = (IntervalWindow) c.window(); + return fmt.print(w.start()); })); + tableConfigure.put("processing_time", + new WriteWindowedToBigQuery.FieldInfo>( + "STRING", c -> fmt.print(Instant.now()))); + tableConfigure.put("timing", + new WriteWindowedToBigQuery.FieldInfo>( + "STRING", c -> c.pane().getTiming().toString())); + return tableConfigure; + } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is used to write user score sums. + */ + protected static Map>> + configureGlobalWindowBigQueryWrite() { + + Map>> tableConfigure = + configureBigQueryWrite(); + tableConfigure.put("processing_time", + new WriteToBigQuery.FieldInfo>( + "STRING", c -> fmt.print(Instant.now()))); + return tableConfigure; + } + + + public static void main(String[] args) throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // Enforce that this pipeline is always run in streaming mode. + options.setStreaming(true); + // For example purposes, allow the pipeline to be easily cancelled instead of running + // continuously. + options.setRunner(DataflowPipelineRunner.class); + DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + Pipeline pipeline = Pipeline.create(options); + + // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub + // data elements, and parse the data. + PCollection gameEvents = pipeline + .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) + .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); + + // [START DocInclude_WindowAndTrigger] + // Extract team/score pairs from the event stream, using hour-long windows by default. + gameEvents + .apply(Window.named("LeaderboardTeamFixedWindows") + .into(FixedWindows.of( + Duration.standardMinutes(options.getTeamWindowDuration()))) + // We will get early (speculative) results as well as cumulative + // processing of late data. + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(FIVE_MINUTES)) + .withLateFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(TEN_MINUTES))) + .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())) + .accumulatingFiredPanes()) + // Extract and sum teamname/score pairs from the event data. + .apply("ExtractTeamScore", new ExtractAndSumScore("team")) + // Write the results to BigQuery. + .apply("WriteTeamScoreSums", + new WriteWindowedToBigQuery>( + options.getTableName() + "_team", configureWindowedTableWrite())); + // [END DocInclude_WindowAndTrigger] + + // [START DocInclude_ProcTimeTrigger] + // Extract user/score pairs from the event stream using processing time, via global windowing. + // Get periodic updates on all users' running scores. + gameEvents + .apply(Window.named("LeaderboardUserGlobalWindow") + .into(new GlobalWindows()) + // Get periodic results every ten minutes. + .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(TEN_MINUTES))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))) + // Extract and sum username/score pairs from the event data. + .apply("ExtractUserScore", new ExtractAndSumScore("user")) + // Write the results to BigQuery. + .apply("WriteUserScoreSums", + new WriteToBigQuery>( + options.getTableName() + "_user", configureGlobalWindowBigQueryWrite())); + // [END DocInclude_ProcTimeTrigger] + + // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the + // command line. + PipelineResult result = pipeline.run(); + dataflowUtils.waitToFinish(result); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md new file mode 100644 index 0000000..79b55ce --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md @@ -0,0 +1,113 @@ + +# 'Gaming' examples + + +This directory holds a series of example Dataflow pipelines in a simple 'mobile +gaming' domain. They all require Java 8. Each pipeline successively introduces +new concepts, and gives some examples of using Java 8 syntax in constructing +Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts +that are used apply equally well in Java 7. + +In the gaming scenario, many users play, as members of different teams, over +the course of a day, and their actions are logged for processing. Some of the +logged game events may be late-arriving, if users play on mobile devices and go +transiently offline for a period. + +The scenario includes not only "regular" users, but "robot users", which have a +higher click rate than the regular users, and may move from team to team. + +The first two pipelines in the series use pre-generated batch data samples. The +second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/) +topic input. For these examples, you will also need to run the +`injector.Injector` program, which generates and publishes the gaming data to +PubSub. The javadocs for each pipeline have more detailed information on how to +run that pipeline. + +All of these pipelines write their results to BigQuery table(s). + + +## The pipelines in the 'gaming' series + +### UserScore + +The first pipeline in the series is `UserScore`. This pipeline does batch +processing of data collected from gaming events. It calculates the sum of +scores per user, over an entire batch of gaming data (collected, say, for each +day). The batch processing will not include any late data that arrives after +the day's cutoff point. + +### HourlyTeamScore + +The next pipeline in the series is `HourlyTeamScore`. This pipeline also +processes data collected from gaming events in batch. It builds on `UserScore`, +but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by +default an hour in duration. It calculates the sum of scores per team, for each +window, optionally allowing specification of two timestamps before and after +which data is filtered out. This allows a model where late data collected after +the intended analysis window can be included in the analysis, and any late- +arriving data prior to the beginning of the analysis window can be removed as +well. + +By using windowing and adding element timestamps, we can do finer-grained +analysis than with the `UserScore` pipeline — we're now tracking scores for +each hour rather than over the course of a whole day. However, our batch +processing is high-latency, in that we don't get results from plays at the +beginning of the batch's time period until the complete batch is processed. + +### LeaderBoard + +The third pipeline in the series is `LeaderBoard`. This pipeline processes an +unbounded stream of 'game events' from a PubSub topic. The calculation of the +team scores uses fixed windowing based on event time (the time of the game play +event), not processing time (the time that an event is processed by the +pipeline). The pipeline calculates the sum of scores per team, for each window. +By default, the team scores are calculated using one-hour windows. + +In contrast — to demo another windowing option — the user scores are calculated +using a global window, which periodically (every ten minutes) emits cumulative +user score sums. + +In contrast to the previous pipelines in the series, which used static, finite +input data, here we're using an unbounded data source, which lets us provide +_speculative_ results, and allows handling of late data, at much lower latency. +E.g., we could use the early/speculative results to keep a 'leaderboard' +updated in near-realtime. Our handling of late data lets us generate correct +results, e.g. for 'team prizes'. We're now outputing window results as they're +calculated, giving us much lower latency than with the previous batch examples. + +### GameStats + +The fourth pipeline in the series is `GameStats`. This pipeline builds +on the `LeaderBoard` functionality — supporting output of speculative and late +data — and adds some "business intelligence" analysis: identifying abuse +detection. The pipeline derives the Mean user score sum for a window, and uses +that information to identify likely spammers/robots. (The injector is designed +so that the "robots" have a higher click rate than the "real" users). The robot +users are then filtered out when calculating the team scores. + +Additionally, user sessions are tracked: that is, we find bursts of user +activity using session windows. Then, the mean session duration information is +recorded in the context of subsequent fixed windowing. (This could be used to +tell us what games are giving us greater user retention). + +### Running the PubSub Injector + +The `LeaderBoard` and `GameStats` example pipelines read unbounded data +from a PubSub topic. + +Use the `injector.Injector` program to generate this data and publish to a +PubSub topic. See the `Injector`javadocs for more information on how to run the +injector. Set up the injector before you start one of these pipelines. Then, +when you start the pipeline, pass as an argument the name of that PubSub topic. +See the pipeline javadocs for the details. + +## Viewing the results in BigQuery + +All of the pipelines write their results to BigQuery. `UserScore` and +`HourlyTeamScore` each write one table, and `LeaderBoard` and +`GameStats` each write two. The pipelines have default table names that +you can override when you start up the pipeline if those tables already exist. + +Depending on the windowing intervals defined in a given pipeline, you may have +to wait for a while (more than an hour) before you start to see results written +to the BigQuery tables.