beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [46/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:48:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
deleted file mode 100644
index 269ee6a..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Mean;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * This is an example that demonstrates several approaches to filtering, and use of the Mean
- * transform. It shows how to dynamically set parameters by defining and using new pipeline options,
- * and how to use a value derived by the pipeline.
- *
- * <p>Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side
- * input; approaches to filtering, selection, and projection.
- *
- * <p>The example reads public samples of weather data from BigQuery. It performs a
- * projection on the data, finds the global mean of the temperature readings, filters on readings
- * for a single given month, and then outputs only data (for that month) that has a mean temp
- * smaller than the derived global mean.
-*
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- *   [--monthFilter=<month_number>]
- * }
- * </pre>
- * where optional parameter {@code --monthFilter} is set to a number 1-12.
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- *   [--monthFilter=<month_number>]
- * }
- * </pre>
- * where optional parameter {@code --monthFilter} is set to a number 1-12.
- *
- * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
- * and can be overridden with {@code --input}.
- */
-public class FilterExamples {
-  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
-  private static final String WEATHER_SAMPLES_TABLE =
-      "clouddataflow-readonly:samples.weather_stations";
-  static final Logger LOG = Logger.getLogger(FilterExamples.class.getName());
-  static final int MONTH_TO_FILTER = 7;
-
-  /**
-   * Examines each row in the input table. Outputs only the subset of the cells this example
-   * is interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
-   */
-  static class ProjectionFn extends DoFn<TableRow, TableRow> {
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      // Grab year, month, day, mean_temp from the row
-      Integer year = Integer.parseInt((String) row.get("year"));
-      Integer month = Integer.parseInt((String) row.get("month"));
-      Integer day = Integer.parseInt((String) row.get("day"));
-      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      // Prepares the data for writing to BigQuery by building a TableRow object
-      TableRow outRow = new TableRow()
-          .set("year", year).set("month", month)
-          .set("day", day).set("mean_temp", meanTemp);
-      c.output(outRow);
-    }
-  }
-
-  /**
-   * Implements 'filter' functionality.
-   *
-   * <p>Examines each row in the input table. Outputs only rows from the month
-   * monthFilter, which is passed in as a parameter during construction of this DoFn.
-   */
-  static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
-    Integer monthFilter;
-
-    public FilterSingleMonthDataFn(Integer monthFilter) {
-      this.monthFilter = monthFilter;
-    }
-
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      Integer month;
-      month = (Integer) row.get("month");
-      if (month.equals(this.monthFilter)) {
-        c.output(row);
-      }
-    }
-  }
-
-  /**
-   * Examines each row (weather reading) in the input table. Output the temperature
-   * reading for that row ('mean_temp').
-   */
-  static class ExtractTempFn extends DoFn<TableRow, Double> {
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      c.output(meanTemp);
-    }
-  }
-
-
-
-  /*
-   * Finds the global mean of the mean_temp for each day/record, and outputs
-   * only data that has a mean temp larger than this global mean.
-   **/
-  static class BelowGlobalMean
-      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
-    Integer monthFilter;
-
-    public BelowGlobalMean(Integer monthFilter) {
-      this.monthFilter = monthFilter;
-    }
-
-
-    @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
-      // Extract the mean_temp from each row.
-      PCollection<Double> meanTemps = rows.apply(
-          ParDo.of(new ExtractTempFn()));
-
-      // Find the global mean, of all the mean_temp readings in the weather data,
-      // and prepare this singleton PCollectionView for use as a side input.
-      final PCollectionView<Double> globalMeanTemp =
-          meanTemps.apply(Mean.<Double>globally())
-               .apply(View.<Double>asSingleton());
-
-      // Rows filtered to remove all but a single month
-      PCollection<TableRow> monthFilteredRows = rows
-          .apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));
-
-      // Then, use the global mean as a side input, to further filter the weather data.
-      // By using a side input to pass in the filtering criteria, we can use a value
-      // that is computed earlier in pipeline execution.
-      // We'll only output readings with temperatures below this mean.
-      PCollection<TableRow> filteredRows = monthFilteredRows
-          .apply(ParDo
-              .named("ParseAndFilter")
-              .withSideInputs(globalMeanTemp)
-              .of(new DoFn<TableRow, TableRow>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
-                  Double gTemp = c.sideInput(globalMeanTemp);
-                  if (meanTemp < gTemp) {
-                    c.output(c.element());
-                  }
-                }
-              }));
-
-      return filteredRows;
-    }
-  }
-
-
-  /**
-   * Options supported by {@link FilterExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Table to read from, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Default.String(WEATHER_SAMPLES_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("Table to write to, specified as "
-        + "<project_id>:<dataset_id>.<table_id>. "
-        + "The dataset_id must already exist")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-
-    @Description("Numeric value of month to filter on")
-    @Default.Integer(MONTH_TO_FILTER)
-    Integer getMonthFilter();
-    void setMonthFilter(Integer value);
-  }
-
-  /**
-   * Helper method to build the table schema for the output table.
-   */
-  private static TableSchema buildWeatherSchemaProjection() {
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT"));
-    TableSchema schema = new TableSchema().setFields(fields);
-    return schema;
-  }
-
-  public static void main(String[] args)
-      throws Exception {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-
-    TableSchema schema = buildWeatherSchemaProjection();
-
-    p.apply(BigQueryIO.Read.from(options.getInput()))
-     .apply(ParDo.of(new ProjectionFn()))
-     .apply(new BelowGlobalMean(options.getMonthFilter()))
-     .apply(BigQueryIO.Write
-        .to(options.getOutput())
-        .withSchema(schema)
-        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
deleted file mode 100644
index 765420e..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-/**
- * This example shows how to do a join on two collections.
- * It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), joining the event
- * 'action' country code against a table that maps country codes to country names.
- *
- * <p>Concepts: Join operation; multiple input sources.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- */
-public class JoinExamples {
-
-  // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
-  private static final String GDELT_EVENTS_TABLE =
-      "clouddataflow-readonly:samples.gdelt_sample";
-  // A table that maps country codes to country names.
-  private static final String COUNTRY_CODES =
-      "gdelt-bq:full.crosswalk_geocountrycodetohuman";
-
-  /**
-   * Join two collections, using country code as the key.
-   */
-  static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
-      PCollection<TableRow> countryCodes) throws Exception {
-
-    final TupleTag<String> eventInfoTag = new TupleTag<String>();
-    final TupleTag<String> countryInfoTag = new TupleTag<String>();
-
-    // transform both input collections to tuple collections, where the keys are country
-    // codes in both cases.
-    PCollection<KV<String, String>> eventInfo = eventsTable.apply(
-        ParDo.of(new ExtractEventDataFn()));
-    PCollection<KV<String, String>> countryInfo = countryCodes.apply(
-        ParDo.of(new ExtractCountryInfoFn()));
-
-    // country code 'key' -> CGBKR (<event info>, <country name>)
-    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
-        .of(eventInfoTag, eventInfo)
-        .and(countryInfoTag, countryInfo)
-        .apply(CoGroupByKey.<String>create());
-
-    // Process the CoGbkResult elements generated by the CoGroupByKey transform.
-    // country code 'key' -> string of <event info>, <country name>
-    PCollection<KV<String, String>> finalResultCollection =
-      kvpCollection.apply(ParDo.named("Process").of(
-        new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            KV<String, CoGbkResult> e = c.element();
-            String countryCode = e.getKey();
-            String countryName = "none";
-            countryName = e.getValue().getOnly(countryInfoTag);
-            for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
-              // Generate a string that combines information from both collection values
-              c.output(KV.of(countryCode, "Country name: " + countryName
-                      + ", Event info: " + eventInfo));
-            }
-          }
-      }));
-
-    // write to GCS
-    PCollection<String> formattedResults = finalResultCollection
-        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            String outputstring = "Country code: " + c.element().getKey()
-                + ", " + c.element().getValue();
-            c.output(outputstring);
-          }
-        }));
-    return formattedResults;
-  }
-
-  /**
-   * Examines each row (event) in the input table. Output a KV with the key the country
-   * code of the event, and the value a string encoding event information.
-   */
-  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      String countryCode = (String) row.get("ActionGeo_CountryCode");
-      String sqlDate = (String) row.get("SQLDATE");
-      String actor1Name = (String) row.get("Actor1Name");
-      String sourceUrl = (String) row.get("SOURCEURL");
-      String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
-      c.output(KV.of(countryCode, eventInfo));
-    }
-  }
-
-
-  /**
-   * Examines each row (country info) in the input table. Output a KV with the key the country
-   * code, and the value the country name.
-   */
-  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      String countryCode = (String) row.get("FIPSCC");
-      String countryName = (String) row.get("HumanName");
-      c.output(KV.of(countryCode, countryName));
-    }
-  }
-
-
-  /**
-   * Options supported by {@link JoinExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Path of the file to write to")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args) throws Exception {
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-    // the following two 'applys' create multiple inputs to our pipeline, one for each
-    // of our two input sources.
-    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
-    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
-    PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
-    formattedResults.apply(TextIO.Write.to(options.getOutput()));
-    p.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
deleted file mode 100644
index b35a57f..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An example that reads the public samples of weather data from BigQuery, and finds
- * the maximum temperature ('mean_temp') for each month.
- *
- * <p>Concepts: The 'Max' statistical combination function, and how to find the max per
- * key group.
- *
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output, with the form
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations }
- * and can be overridden with {@code --input}.
- */
-public class MaxPerKeyExamples {
-  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
-  private static final String WEATHER_SAMPLES_TABLE =
-      "clouddataflow-readonly:samples.weather_stations";
-
-  /**
-   * Examines each row (weather reading) in the input table. Output the month of the reading,
-   * and the mean_temp.
-   */
-  static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      Integer month = Integer.parseInt((String) row.get("month"));
-      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      c.output(KV.of(month, meanTemp));
-    }
-  }
-
-  /**
-   * Format the results to a TableRow, to save to BigQuery.
-   *
-   */
-  static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = new TableRow()
-          .set("month", c.element().getKey())
-          .set("max_mean_temp", c.element().getValue());
-      c.output(row);
-    }
-  }
-
-  /**
-   * Reads rows from a weather data table, and finds the max mean_temp for each
-   * month via the 'Max' statistical combination function.
-   */
-  static class MaxMeanTemp
-      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
-    @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
-      // row... => <month, mean_temp> ...
-      PCollection<KV<Integer, Double>> temps = rows.apply(
-          ParDo.of(new ExtractTempFn()));
-
-      // month, mean_temp... => <month, max mean temp>...
-      PCollection<KV<Integer, Double>> tempMaxes =
-          temps.apply(Max.<Integer>doublesPerKey());
-
-      // <month, max>... => row...
-      PCollection<TableRow> results = tempMaxes.apply(
-          ParDo.of(new FormatMaxesFn()));
-
-      return results;
-    }
-  }
-
-  /**
-   * Options supported by {@link MaxPerKeyExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Table to read from, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Default.String(WEATHER_SAMPLES_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("Table to write to, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args)
-      throws Exception {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-
-    // Build the table schema for the output table.
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
-    TableSchema schema = new TableSchema().setFields(fields);
-
-    p.apply(BigQueryIO.Read.from(options.getInput()))
-     .apply(new MaxMeanTemp())
-     .apply(BigQueryIO.Write
-        .to(options.getOutput())
-        .withSchema(schema)
-        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
deleted file mode 100644
index 99f3080..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
+++ /dev/null
@@ -1,55 +0,0 @@
-
-# "Cookbook" Examples
-
-This directory holds simple "cookbook" examples, which show how to define
-commonly-used data analysis patterns that you would likely incorporate into a
-larger Dataflow pipeline. They include:
-
- <ul>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java">BigQueryTornadoes</a>
-  &mdash; An example that reads the public samples of weather data from Google
-  BigQuery, counts the number of tornadoes that occur in each month, and
-  writes the results to BigQuery. Demonstrates reading/writing BigQuery,
-  counting a <code>PCollection</code>, and user-defined <code>PTransforms</code>.</li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java">CombinePerKeyExamples</a>
-  &mdash; An example that reads the public &quot;Shakespeare&quot; data, and for
-  each word in the dataset that exceeds a given length, generates a string
-  containing the list of play names in which that word appears.
-  Demonstrates the <code>Combine.perKey</code>
-  transform, which lets you combine the values in a key-grouped
-  <code>PCollection</code>.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java">DatastoreWordCount</a>
-  &mdash; An example that shows you how to read from Google Cloud Datastore.</li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a>
-  &mdash; An example that uses Shakespeare's plays as plain text files, and
-  removes duplicate lines across all the files. Demonstrates the
-  <code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
-  and <code>TextIO.Write</code> transforms, and how to wire transforms together.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a>
-  &mdash; An example that shows different approaches to filtering, including
-  selection and projection. It also shows how to dynamically set parameters
-  by defining and using new pipeline options, and use how to use a value derived
-  by a pipeline. Demonstrates the <code>Mean</code> transform,
-  <code>Options</code> configuration, and using pipeline-derived data as a side
-  input.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java">JoinExamples</a>
-  &mdash; An example that shows how to join two collections. It uses a
-  sample of the <a href="http://goo.gl/OB6oin">GDELT &quot;world event&quot;
-  data</a>, joining the event <code>action</code> country code against a table
-  that maps country codes to country names. Demonstrates the <code>Join</code>
-  operation, and using multiple input sources.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java">MaxPerKeyExamples</a>
-  &mdash; An example that reads the public samples of weather data from BigQuery,
-  and finds the maximum temperature (<code>mean_temp</code>) for each month.
-  Demonstrates the <code>Max</code> statistical combination transform, and how to
-  find the max-per-key group.
-  </li>
-  </ul>
-
-See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples
-README](../../../../../../../../../README.md) for
-information about how to run these examples.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java
deleted file mode 100644
index e32596d..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.examples.common.PubsubFileInjector;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterEach;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This example illustrates the basic concepts behind triggering. It shows how to use different
- * trigger definitions to produce partial (speculative) results before all the data is processed and
- * to control when updated results are produced for late data. The example performs a streaming
- * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
- * data into {@link Window windows} to be processed, and demonstrates using various kinds of {@link
- * Trigger triggers} to control when the results for each window are emitted.
- *
- * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
- * readings from sensor stations set up along each freeway. Each sensor reading includes a
- * calculation of the 'total flow' across all lanes in that freeway direction.
- *
- * <p> Concepts:
- * <pre>
- *   1. The default triggering behavior
- *   2. Late data with the default trigger
- *   3. How to get speculative estimates
- *   4. Combining late data and speculative estimates
- * </pre>
- *
- * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
- * and understand the concept of 'late data',
- * See:  <a href="https://cloud.google.com/dataflow/model/triggers">
- * https://cloud.google.com/dataflow/model/triggers </a> and
- * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
- * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
- *
- * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
- * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
- * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
- * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
- * pipeline also randomly simulates late data, by setting the timestamps of some of the data
- * elements to be in the past. You may override the default {@code --input} with the file of your
- * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
- * you to use a separate tool to publish to the given topic.
- *
- * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
- * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p> The pipeline outputs its results to a BigQuery table.
- * Here are some queries you can use to see interesting results:
- * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
- * Replace {@code <enter_window_interval>} in the query below with the window interval.
- *
- * <p> To see the results of the default trigger,
- * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
- * the window duration, until the first pane of non-late data has been emitted, to see more
- * interesting results.
- * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
- *
- * <p> To see the late data i.e. dropped by the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
- * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
- *
- * <p>To see the the difference between accumulation mode and discarding mode,
- * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
- * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
- * window DESC, processing_time}
- *
- * <p> To see speculative results every minute,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
- * ORDER BY window DESC, processing_time}
- *
- * <p> To see speculative results every five minutes after the end of the window
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
- * and freeway = "5" ORDER BY window DESC, processing_time}
- *
- * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
- * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
- *
- * <p> To reduce the number of results for each query we can add additional where clauses.
- * For examples, To see the results of the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
- * window = "<enter_window_interval>"}
- *
- * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-
-public class TriggerExample {
-  //Numeric value of fixed window duration, in minutes
-  public static final int WINDOW_DURATION = 30;
-  // Constants used in triggers.
-  // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
-  // ONE_MINUTE is used only with processing time before the end of the window
-  public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
-  // FIVE_MINUTES is used only with processing time after the end of the window
-  public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
-  // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
-  public static final Duration ONE_DAY = Duration.standardDays(1);
-
-  /**
-   * This transform demonstrates using triggers to control when data is produced for each window
-   * Consider an example to understand the results generated by each type of trigger.
-   * The example uses "freeway" as the key. Event time is the timestamp associated with the data
-   * element and processing time is the time when the data element gets processed in the pipeline.
-   * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
-   * Key (freeway) | Value (total_flow) | event time | processing time
-   * 5             | 50                 | 10:00:03   | 10:00:47
-   * 5             | 30                 | 10:01:00   | 10:01:03
-   * 5             | 30                 | 10:02:00   | 11:07:00
-   * 5             | 20                 | 10:04:10   | 10:05:15
-   * 5             | 60                 | 10:05:00   | 11:03:00
-   * 5             | 20                 | 10:05:01   | 11.07:30
-   * 5             | 60                 | 10:15:00   | 10:27:15
-   * 5             | 40                 | 10:26:40   | 10:26:43
-   * 5             | 60                 | 10:27:20   | 10:27:25
-   * 5             | 60                 | 10:29:00   | 11:11:00
-   *
-   * <p> Dataflow tracks a watermark which records up to what point in event time the data is
-   * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
-   * behind the current processing time. In practice, the actual value would vary over time based
-   * on the systems knowledge of the current PubSub delay and contents of the backlog (data
-   * that has not yet been processed).
-   *
-   * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
-   * close at 10:44:59, when the watermark passes 10:30:00.
-   */
-  static class CalculateTotalFlow
-  extends PTransform <PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
-    private int windowDuration;
-
-    CalculateTotalFlow(int windowDuration) {
-      this.windowDuration = windowDuration;
-    }
-
-    @Override
-    public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
-
-      // Concept #1: The default triggering behavior
-      // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
-      // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
-
-      // The system also defaults to dropping late data -- data which arrives after the watermark
-      // has passed the event timestamp of the arriving element. This means that the default trigger
-      // will only fire once.
-
-      // Each pane produced by the default trigger with no allowed lateness will be the first and
-      // last pane in the window, and will be ON_TIME.
-
-      // The results for the example above with the default trigger and zero allowed lateness
-      // would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
-      // 5             | 260                | 6                 | true    | true   | ON_TIME
-
-      // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
-      // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
-      // late, and dropped.
-
-      PCollection<TableRow> defaultTriggerResults = flowInfo
-          .apply("Default", Window
-              // The default window duration values work well if you're running the default input
-              // file. You may want to adjust the window duration otherwise.
-              .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
-              // The default trigger first emits output when the system's watermark passes the end
-              // of the window.
-              .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-              // Late data is dropped
-              .withAllowedLateness(Duration.ZERO)
-              // Discard elements after emitting each pane.
-              // With no allowed lateness and the specified trigger there will only be a single
-              // pane, so this doesn't have a noticeable effect. See concept 2 for more details.
-              .discardingFiredPanes())
-          .apply(new TotalFlow("default"));
-
-      // Concept #2: Late data with the default trigger
-      // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
-      // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
-      // window. Any late data will result in an additional pane being fired for that same window.
-
-      // The first pane produced will be ON_TIME and the remaining panes will be LATE.
-      // To definitely get the last pane when the window closes, use
-      // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
-
-      // The results for the example above with the default trigger and ONE_DAY allowed lateness
-      // would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
-      // 5             | 260                | 6                 | true    | false  | ON_TIME
-      // 5             | 60                 | 1                 | false   | false  | LATE
-      // 5             | 30                 | 1                 | false   | false  | LATE
-      // 5             | 20                 | 1                 | false   | false  | LATE
-      // 5             | 60                 | 1                 | false   | false  | LATE
-      PCollection<TableRow> withAllowedLatenessResults = flowInfo
-          .apply("WithLateData", Window
-              .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
-              // Late data is emitted as it arrives
-              .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-              // Once the output is produced, the pane is dropped and we start preparing the next
-              // pane for the window
-              .discardingFiredPanes()
-              // Late data is handled up to one day
-              .withAllowedLateness(ONE_DAY))
-          .apply(new TotalFlow("withAllowedLateness"));
-
-      // Concept #3: How to get speculative estimates
-      // We can specify a trigger that fires independent of the watermark, for instance after
-      // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
-      // all the data is available. Since we don't have any triggers that depend on the watermark
-      // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
-
-      // We also use accumulatingFiredPanes to build up the results across each pane firing.
-
-      // The results for the example above for this trigger would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
-      // 5             | 80                 | 2                 | true    | false  | EARLY
-      // 5             | 100                | 3                 | false   | false  | EARLY
-      // 5             | 260                | 6                 | false   | false  | EARLY
-      // 5             | 320                | 7                 | false   | false  | LATE
-      // 5             | 370                | 9                 | false   | false  | LATE
-      // 5             | 430                | 10                | false   | false  | LATE
-      PCollection<TableRow> speculativeResults = flowInfo
-          .apply("Speculative" , Window
-              .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
-              // Trigger fires every minute.
-              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
-                  // Speculative every ONE_MINUTE
-                  .plusDelayOf(ONE_MINUTE)))
-              // After emitting each pane, it will continue accumulating the elements so that each
-              // approximation includes all of the previous data in addition to the newly arrived
-              // data.
-              .accumulatingFiredPanes()
-              .withAllowedLateness(ONE_DAY))
-          .apply(new TotalFlow("speculative"));
-
-      // Concept #4: Combining late data and speculative estimates
-      // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
-      // and LATE updates based on late data.
-
-      // Each time a triggering condition is satisfied it advances to the next trigger.
-      // If there are new elements this trigger emits a window under following condition:
-      // > Early approximations every minute till the end of the window.
-      // > An on-time firing when the watermark has passed the end of the window
-      // > Every five minutes of late data.
-
-      // Every pane produced will either be EARLY, ON_TIME or LATE.
-
-      // The results for the example above for this trigger would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
-      // 5             | 80                 | 2                 | true    | false  | EARLY
-      // 5             | 100                | 3                 | false   | false  | EARLY
-      // 5             | 260                | 6                 | false   | false  | EARLY
-      // [First pane fired after the end of the window]
-      // 5             | 320                | 7                 | false   | false  | ON_TIME
-      // 5             | 430                | 10                | false   | false  | LATE
-
-      // For more possibilities of how to build advanced triggers, see {@link Trigger}.
-      PCollection<TableRow> sequentialResults = flowInfo
-          .apply("Sequential", Window
-              .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
-              .triggering(AfterEach.inOrder(
-                  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
-                      // Speculative every ONE_MINUTE
-                      .plusDelayOf(ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()),
-                  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
-                      // Late data every FIVE_MINUTES
-                      .plusDelayOf(FIVE_MINUTES))))
-              .accumulatingFiredPanes()
-              // For up to ONE_DAY
-              .withAllowedLateness(ONE_DAY))
-          .apply(new TotalFlow("sequential"));
-
-      // Adds the results generated by each trigger type to a PCollectionList.
-      PCollectionList<TableRow> resultsList = PCollectionList.of(defaultTriggerResults)
-          .and(withAllowedLatenessResults)
-          .and(speculativeResults)
-          .and(sequentialResults);
-
-      return resultsList;
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-  // The remaining parts of the pipeline are needed to produce the output for each
-  // concept above. Not directly relevant to understanding the trigger examples.
-
-  /**
-   * Calculate total flow and number of records for each freeway and format the results to TableRow
-   * objects, to save to BigQuery.
-   */
-  static class TotalFlow extends
-  PTransform <PCollection<KV<String, Integer>>, PCollection<TableRow>> {
-    private String triggerType;
-
-    public TotalFlow(String triggerType) {
-      this.triggerType = triggerType;
-    }
-
-    @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
-      PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
-          .apply(GroupByKey.<String, Integer>create());
-
-      PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of(
-          new DoFn <KV<String, Iterable<Integer>>, KV<String, String>>() {
-
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              Iterable<Integer> flows = c.element().getValue();
-              Integer sum = 0;
-              Long numberOfRecords = 0L;
-              for (Integer value : flows) {
-                sum += value;
-                numberOfRecords++;
-              }
-              c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords));
-            }
-          }));
-      PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType)));
-      return output;
-    }
-  }
-
-  /**
-   * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
-   * Adds the triggerType, pane information, processing time and the window timestamp.
-   * */
-  static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow>
-  implements  RequiresWindowAccess {
-    private String triggerType;
-
-    public FormatTotalFlow(String triggerType) {
-      this.triggerType = triggerType;
-    }
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      String[] values = c.element().getValue().split(",");
-      TableRow row = new TableRow()
-          .set("trigger_type", triggerType)
-          .set("freeway", c.element().getKey())
-          .set("total_flow", Integer.parseInt(values[0]))
-          .set("number_of_records", Long.parseLong(values[1]))
-          .set("window", c.window().toString())
-          .set("isFirst", c.pane().isFirst())
-          .set("isLast", c.pane().isLast())
-          .set("timing", c.pane().getTiming().toString())
-          .set("event_time", c.timestamp().toString())
-          .set("processing_time", Instant.now().toString());
-      c.output(row);
-    }
-  }
-
-  /**
-   * Extract the freeway and total flow in a reading.
-   * Freeway is used as key since we are calculating the total flow for each freeway.
-   */
-  static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      String[] laneInfo = c.element().split(",");
-      if (laneInfo[0].equals("timestamp")) {
-        // Header row
-        return;
-      }
-      if (laneInfo.length < 48) {
-        //Skip the invalid input.
-        return;
-      }
-      String freeway = laneInfo[2];
-      Integer totalFlow = tryIntegerParse(laneInfo[7]);
-      // Ignore the records with total flow 0 to easily understand the working of triggers.
-      // Skip the records with total flow -1 since they are invalid input.
-      if (totalFlow == null || totalFlow <= 0) {
-        return;
-      }
-      c.output(KV.of(freeway,  totalFlow));
-    }
-  }
-
-  /**
-   * Inherits standard configuration options.
-   */
-  public interface TrafficFlowOptions
-      extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions, DataflowExampleOptions {
-
-    @Description("Input file to inject to Pub/Sub topic")
-    @Default.String("gs://dataflow-samples/traffic_sensor/"
-        + "Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
-    String getInput();
-    void setInput(String value);
-
-    @Description("Numeric value of window duration for fixed windows, in minutes")
-    @Default.Integer(WINDOW_DURATION)
-    Integer getWindowDuration();
-    void setWindowDuration(Integer value);
-  }
-
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-
-  public static void main(String[] args) throws Exception {
-    TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(TrafficFlowOptions.class);
-    options.setStreaming(true);
-
-    // In order to cancel the pipelines automatically,
-    // {@code DataflowPipelineRunner} is forced to be used.
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setBigQuerySchema(getSchema());
-
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
-    dataflowUtils.setup();
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    TableReference tableRef = getTableReference(options.getProject(),
-        options.getBigQueryDataset(), options.getBigQueryTable());
-
-    PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput")
-        .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-        .topic(options.getPubsubTopic()))
-        .apply(ParDo.of(new ExtractFlowInfo()))
-        .apply(new CalculateTotalFlow(options.getWindowDuration()));
-
-    for (int i = 0; i < resultList.size(); i++){
-      resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema()));
-    }
-
-    PipelineResult result = pipeline.run();
-    if (!options.getInput().isEmpty()){
-      //Inject the data into the pubsub topic
-      dataflowUtils.runInjectorPipeline(runInjector(options));
-    }
-    // dataflowUtils will try to cancel the pipeline and the injector before the program exits.
-    dataflowUtils.waitToFinish(result);
-  }
-
-  private static Pipeline runInjector(TrafficFlowOptions options){
-    DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
-    copiedOptions.setStreaming(false);
-    copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
-    copiedOptions.setJobName(options.getJobName() + "-injector");
-    Pipeline injectorPipeline = Pipeline.create(copiedOptions);
-    injectorPipeline
-    .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
-    .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays()))
-    .apply(IntraBundleParallelization.of(PubsubFileInjector
-        .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)
-        .publish(options.getPubsubTopic()))
-        .withMaxParallelism(20));
-
-    return injectorPipeline;
-  }
-
-  /**
-   * Add current time to each record.
-   * Also insert a delay at random to demo the triggers.
-   */
-  public static class InsertDelays extends DoFn<String, String> {
-    private static final double THRESHOLD = 0.001;
-    // MIN_DELAY and MAX_DELAY in minutes.
-    private static final int MIN_DELAY = 1;
-    private static final int MAX_DELAY = 100;
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      Instant timestamp = Instant.now();
-      if (Math.random() < THRESHOLD){
-        int range = MAX_DELAY - MIN_DELAY;
-        int delayInMinutes = (int) (Math.random() * range) + MIN_DELAY;
-        long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
-        timestamp = new Instant(timestamp.getMillis() - delayInMillis);
-      }
-      c.outputWithTimestamp(c.element(), timestamp);
-    }
-  }
-
-
-  /**Sets the table reference. **/
-  private static TableReference getTableReference(String project, String dataset, String table){
-    TableReference tableRef = new TableReference();
-    tableRef.setProjectId(project);
-    tableRef.setDatasetId(dataset);
-    tableRef.setTableId(table);
-    return tableRef;
-  }
-
-  /** Defines the BigQuery schema used for the output. */
-  private static TableSchema getSchema() {
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("window").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
-    fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
-    fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
-    fields.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
-    TableSchema schema = new TableSchema().setFields(fields);
-    return schema;
-  }
-
-  private static Integer tryIntegerParse(String number) {
-    try {
-      return Integer.parseInt(number);
-    } catch (NumberFormatException e) {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
new file mode 100644
index 0000000..7134bca
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+
+/**
+ * An example that verifies word counts in Shakespeare and includes Dataflow best practices.
+ *
+ * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more
+ * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
+ * and {@link WordCount}. After you've looked at this example, then see the
+ * {@link WindowedWordCount} pipeline, for introduction of additional concepts.
+ *
+ * <p>Basic concepts, also in the MinimalWordCount and WordCount examples:
+ * Reading text files; counting a PCollection; executing a Pipeline both locally
+ * and using the Dataflow service; defining DoFns.
+ *
+ * <p>New Concepts:
+ * <pre>
+ *   1. Logging to Cloud Logging
+ *   2. Controlling Dataflow worker log levels
+ *   3. Creating a custom aggregator
+ *   4. Testing your Pipeline via PAssert
+ * </pre>
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service and the additional logging discussed
+ * below, specify pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
+ * }
+ * </pre>
+ *
+ * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
+ * the quotations as appropriate for your shell. For example, in <code>bash</code>:
+ * <pre>
+ * mvn compile exec:java ... \
+ *   -Dexec.args="... \
+ *     --workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}"
+ * </pre>
+ *
+ * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud
+ * Logging by default at "INFO" log level and higher. One may override log levels for specific
+ * logging namespaces by specifying:
+ * <pre><code>
+ *   --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
+ * </code></pre>
+ * For example, by specifying:
+ * <pre><code>
+ *   --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
+ * </code></pre>
+ * when executing this pipeline using the Dataflow service, Cloud Logging would contain only
+ * "DEBUG" or higher level logs for the {@code com.google.cloud.dataflow.examples} package in
+ * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker
+ * logging configuration can be overridden by specifying
+ * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example,
+ * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with
+ * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note
+ * that changing the default worker log level to TRACE or DEBUG will significantly increase
+ * the amount of logs output.
+ *
+ * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
+ * overridden with {@code --inputFile}.
+ */
+public class DebuggingWordCount {
+  /** A DoFn that filters for a specific key based upon a regular expression. */
+  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
+    /**
+     * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
+     * as the logger. All log statements emitted by this logger will be referenced by this name
+     * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging
+     * about the Cloud Logging UI.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
+
+    private final Pattern filter;
+    public FilterTextFn(String pattern) {
+      filter = Pattern.compile(pattern);
+    }
+
+    /**
+     * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those
+     * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the
+     * Dataflow service. These aggregators below track the number of matched and unmatched words.
+     * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
+     * the Dataflow Monitoring UI.
+     */
+    private final Aggregator<Long, Long> matchedWords =
+        createAggregator("matchedWords", new Sum.SumLongFn());
+    private final Aggregator<Long, Long> unmatchedWords =
+        createAggregator("umatchedWords", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (filter.matcher(c.element().getKey()).matches()) {
+        // Log at the "DEBUG" level each element that we match. When executing this pipeline
+        // using the Dataflow service, these log lines will appear in the Cloud Logging UI
+        // only if the log level is set to "DEBUG" or lower.
+        LOG.debug("Matched: " + c.element().getKey());
+        matchedWords.addValue(1L);
+        c.output(c.element());
+      } else {
+        // Log at the "TRACE" level each element that is not matched. Different log levels
+        // can be used to control the verbosity of logging providing an effective mechanism
+        // to filter less important information.
+        LOG.trace("Did not match: " + c.element().getKey());
+        unmatchedWords.addValue(1L);
+      }
+    }
+  }
+
+  /**
+   * Options supported by {@link DebuggingWordCount}.
+   *
+   * <p>Inherits standard configuration options and all options defined in
+   * {@link WordCount.WordCountOptions}.
+   */
+  public static interface WordCountOptions extends WordCount.WordCountOptions {
+
+    @Description("Regex filter pattern to use in DebuggingWordCount. "
+        + "Only words matching this pattern will be counted.")
+    @Default.String("Flourish|stomach")
+    String getFilterPattern();
+    void setFilterPattern(String value);
+  }
+
+  public static void main(String[] args) {
+    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+      .as(WordCountOptions.class);
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<KV<String, Long>> filteredWords =
+        p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+         .apply(new WordCount.CountWords())
+         .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
+
+    /**
+     * Concept #4: PAssert is a set of convenient PTransforms in the style of
+     * Hamcrest's collection matchers that can be used when writing Pipeline level tests
+     * to validate the contents of PCollections. PAssert is best used in unit tests
+     * with small data sets but is demonstrated here as a teaching tool.
+     *
+     * <p>Below we verify that the set of filtered words matches our expected counts. Note
+     * that PAssert does not provide any output and that successful completion of the
+     * Pipeline implies that the expectations were met. Learn more at
+     * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test
+     * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test.
+     */
+    List<KV<String, Long>> expectedResults = Arrays.asList(
+        KV.of("Flourish", 3L),
+        KV.of("stomach", 1L));
+    PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
new file mode 100644
index 0000000..f3be5a3
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+
+/**
+ * An example that counts words in Shakespeare.
+ *
+ * <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more
+ * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
+ * argument processing, and focus on construction of the pipeline, which chains together the
+ * application of core transforms.
+ *
+ * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally
+ * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
+ * concepts.
+ *
+ * <p>Concepts:
+ * <pre>
+ *   1. Reading data from text files
+ *   2. Specifying 'inline' transforms
+ *   3. Counting a PCollection
+ *   4. Writing data to Cloud Storage as text files
+ * </pre>
+ *
+ * <p>To execute this pipeline, first edit the code to set your project ID, the staging
+ * location, and the output location. The specified GCS bucket(s) must already exist.
+ *
+ * <p>Then, run the pipeline as described in the README. It will be deployed and run using the
+ * Dataflow service. No args are required to run the pipeline. You can see the results in your
+ * output bucket in the GCS browser.
+ */
+public class MinimalWordCount {
+
+  public static void main(String[] args) {
+    // Create a DataflowPipelineOptions object. This object lets us set various execution
+    // options for our pipeline, such as the associated Cloud Platform project and the location
+    // in Google Cloud Storage to stage files.
+    DataflowPipelineOptions options = PipelineOptionsFactory.create()
+      .as(DataflowPipelineOptions.class);
+    options.setRunner(BlockingDataflowPipelineRunner.class);
+    // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
+    options.setProject("SET_YOUR_PROJECT_ID_HERE");
+    // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
+    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+
+    // Create the Pipeline object with the options we defined above.
+    Pipeline p = Pipeline.create(options);
+
+    // Apply the pipeline's transforms.
+
+    // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
+    // of input text files. TextIO.Read returns a PCollection where each element is one line from
+    // the input text (a set of Shakespeare's texts).
+    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+     // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
+     // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
+     // The ParDo returns a PCollection<String>, where each element is an individual word in
+     // Shakespeare's collected texts.
+     .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
+                       @Override
+                       public void processElement(ProcessContext c) {
+                         for (String word : c.element().split("[^a-zA-Z']+")) {
+                           if (!word.isEmpty()) {
+                             c.output(word);
+                           }
+                         }
+                       }
+                     }))
+     // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
+     // transform returns a new PCollection of key/value pairs, where each key represents a unique
+     // word in the text. The associated value is the occurrence count for that word.
+     .apply(Count.<String>perElement())
+     // Apply a MapElements transform that formats our PCollection of word counts into a printable
+     // string, suitable for writing to an output file.
+     .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+                       @Override
+                       public String apply(KV<String, Long> input) {
+                         return input.getKey() + ": " + input.getValue();
+                       }
+                     }))
+     // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
+     // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
+     // formatted strings) to a series of text files in Google Cloud Storage.
+     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
+     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+
+    // Run the pipeline.
+    p.run();
+  }
+}


Mime
View raw message