beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [54/67] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:48:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
deleted file mode 100644
index e3e88c2..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
+++ /dev/null
@@ -1,459 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import org.apache.avro.reflect.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
- * You can configure the running mode by setting {@literal --streaming} to true or false.
- *
- * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows, and
- * Google Cloud Pub/Sub topic injection.
- *
- * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
- * it calculates the average speed over the window for some small set of predefined 'routes',
- * and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
- *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
- *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-
-public class TrafficRoutes {
-
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-  private static final Integer VALID_INPUTS = 4999;
-
-  // Instantiate some small predefined San Diego routes to analyze
-  static Map<String, String> sdStations = buildStationInfo();
-  static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
-  static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
-
-  /**
-   * This class holds information about a station reading's average speed.
-   */
-  @DefaultCoder(AvroCoder.class)
-  static class StationSpeed implements Comparable<StationSpeed> {
-    @Nullable String stationId;
-    @Nullable Double avgSpeed;
-    @Nullable Long timestamp;
-
-    public StationSpeed() {}
-
-    public StationSpeed(String stationId, Double avgSpeed, Long timestamp) {
-      this.stationId = stationId;
-      this.avgSpeed = avgSpeed;
-      this.timestamp = timestamp;
-    }
-
-    public String getStationId() {
-      return this.stationId;
-    }
-    public Double getAvgSpeed() {
-      return this.avgSpeed;
-    }
-
-    @Override
-    public int compareTo(StationSpeed other) {
-      return Long.compare(this.timestamp, other.timestamp);
-    }
-  }
-
-  /**
-   * This class holds information about a route's speed/slowdown.
-   */
-  @DefaultCoder(AvroCoder.class)
-  static class RouteInfo {
-    @Nullable String route;
-    @Nullable Double avgSpeed;
-    @Nullable Boolean slowdownEvent;
-
-
-    public RouteInfo() {}
-
-    public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) {
-      this.route = route;
-      this.avgSpeed = avgSpeed;
-      this.slowdownEvent = slowdownEvent;
-    }
-
-    public String getRoute() {
-      return this.route;
-    }
-    public Double getAvgSpeed() {
-      return this.avgSpeed;
-    }
-    public Boolean getSlowdownEvent() {
-      return this.slowdownEvent;
-    }
-  }
-
-  /**
-   * Extract the timestamp field from the input string, and use it as the element timestamp.
-   */
-  static class ExtractTimestamps extends DoFn<String, String> {
-    private static final DateTimeFormatter dateTimeFormat =
-        DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
-
-    @Override
-    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
-      String[] items = c.element().split(",");
-      String timestamp = tryParseTimestamp(items);
-      if (timestamp != null) {
-        try {
-          c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
-        } catch (IllegalArgumentException e) {
-          // Skip the invalid input.
-        }
-      }
-    }
-  }
-
-  /**
-   * Filter out readings for the stations along predefined 'routes', and output
-   * (station, speed info) keyed on route.
-   */
-  static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
-
-    @Override
-    public void processElement(ProcessContext c) {
-      String[] items = c.element().split(",");
-      String stationType = tryParseStationType(items);
-      // For this analysis, use only 'main line' station types
-      if (stationType != null && stationType.equals("ML")) {
-        Double avgSpeed = tryParseAvgSpeed(items);
-        String stationId = tryParseStationId(items);
-        // For this simple example, filter out everything but some hardwired routes.
-        if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
-          StationSpeed stationSpeed =
-              new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
-          // The tuple key is the 'route' name stored in the 'sdStations' hash.
-          KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
-          c.output(outputValue);
-        }
-      }
-    }
-  }
-
-  /**
-   * For a given route, track average speed for the window. Calculate whether
-   * traffic is currently slowing down, via a predefined threshold. If a supermajority of
-   * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
-   * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
-   */
-  static class GatherStats
-      extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
-    @Override
-    public void processElement(ProcessContext c) throws IOException {
-      String route = c.element().getKey();
-      double speedSum = 0.0;
-      int speedCount = 0;
-      int speedups = 0;
-      int slowdowns = 0;
-      List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
-      // StationSpeeds sort by embedded timestamp.
-      Collections.sort(infoList);
-      Map<String, Double> prevSpeeds = new HashMap<>();
-      // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
-      for (StationSpeed item : infoList) {
-        Double speed = item.getAvgSpeed();
-        if (speed != null) {
-          speedSum += speed;
-          speedCount++;
-          Double lastSpeed = prevSpeeds.get(item.getStationId());
-          if (lastSpeed != null) {
-            if (lastSpeed < speed) {
-              speedups += 1;
-            } else {
-              slowdowns += 1;
-            }
-          }
-          prevSpeeds.put(item.getStationId(), speed);
-        }
-      }
-      if (speedCount == 0) {
-        // No average to compute.
-        return;
-      }
-      double speedAvg = speedSum / speedCount;
-      boolean slowdownEvent = slowdowns >= 2 * speedups;
-      RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
-      c.output(KV.of(route, routeInfo));
-    }
-  }
-
-  /**
-   * Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
-   */
-  static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      RouteInfo routeInfo = c.element().getValue();
-      TableRow row = new TableRow()
-          .set("avg_speed", routeInfo.getAvgSpeed())
-          .set("slowdown_event", routeInfo.getSlowdownEvent())
-          .set("route", c.element().getKey())
-          .set("window_timestamp", c.timestamp().toString());
-      c.output(row);
-    }
-
-    /**
-     * Defines the BigQuery schema used for the output.
-     */
-    static TableSchema getSchema() {
-      List<TableFieldSchema> fields = new ArrayList<>();
-      fields.add(new TableFieldSchema().setName("route").setType("STRING"));
-      fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
-      fields.add(new TableFieldSchema().setName("slowdown_event").setType("BOOLEAN"));
-      fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
-      TableSchema schema = new TableSchema().setFields(fields);
-      return schema;
-    }
-  }
-
-  /**
-   * This PTransform extracts speed info from traffic station readings.
-   * It groups the readings by 'route' and analyzes traffic slowdown for that route.
-   * Lastly, it formats the results for BigQuery.
-   */
-  static class TrackSpeed extends
-      PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
-    @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
-      // Apply a GroupByKey transform to collect a list of all station
-      // readings for a given route.
-      PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
-        GroupByKey.<String, StationSpeed>create());
-
-      // Analyze 'slowdown' over the route readings.
-      PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));
-
-      // Format the results for writing to BigQuery
-      PCollection<TableRow> results = stats.apply(
-          ParDo.of(new FormatStatsFn()));
-
-      return results;
-    }
-  }
-
-  static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
-    private final String inputFile;
-
-    public ReadFileAndExtractTimestamps(String inputFile) {
-      this.inputFile = inputFile;
-    }
-
-    @Override
-    public PCollection<String> apply(PBegin begin) {
-      return begin
-          .apply(TextIO.Read.from(inputFile))
-          .apply(ParDo.of(new ExtractTimestamps()));
-    }
-  }
-
-  /**
-  * Options supported by {@link TrafficRoutes}.
-  *
-  * <p>Inherits standard configuration options.
-  */
-  private interface TrafficRoutesOptions extends DataflowExampleOptions,
-      ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
-    @Description("Input file to inject to Pub/Sub topic")
-    @Default.String("gs://dataflow-samples/traffic_sensor/"
-        + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
-    String getInputFile();
-    void setInputFile(String value);
-
-    @Description("Numeric value of sliding window duration, in minutes")
-    @Default.Integer(WINDOW_DURATION)
-    Integer getWindowDuration();
-    void setWindowDuration(Integer value);
-
-    @Description("Numeric value of window 'slide every' setting, in minutes")
-    @Default.Integer(WINDOW_SLIDE_EVERY)
-    Integer getWindowSlideEvery();
-    void setWindowSlideEvery(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    @Default.Boolean(false)
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
-  }
-
-  /**
-   * Sets up and starts streaming pipeline.
-   *
-   * @throws IOException if there is a problem setting up resources
-   */
-  public static void main(String[] args) throws IOException {
-    TrafficRoutesOptions options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(TrafficRoutesOptions.class);
-
-    options.setBigQuerySchema(FormatStatsFn.getSchema());
-    // Using DataflowExampleUtils to set up required resources.
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
-
-    Pipeline pipeline = Pipeline.create(options);
-    TableReference tableRef = new TableReference();
-    tableRef.setProjectId(options.getProject());
-    tableRef.setDatasetId(options.getBigQueryDataset());
-    tableRef.setTableId(options.getBigQueryTable());
-
-    PCollection<String> input;
-    if (options.isUnbounded()) {
-      // Read unbounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()));
-    } else {
-      // Read bounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
-      // To read bounded TextIO files, use:
-      // input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
-      //    .apply(ParDo.of(new ExtractTimestamps()));
-    }
-    input
-        // row... => <station route, station speed> ...
-        .apply(ParDo.of(new ExtractStationSpeedFn()))
-        // map the incoming data stream into sliding windows.
-        // The default window duration values work well if you're running the accompanying Pub/Sub
-        // generator script without the --replay flag, so that there are no simulated pauses in
-        // the sensor data publication. You may want to adjust the values otherwise.
-        .apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
-            Duration.standardMinutes(options.getWindowDuration())).
-            every(Duration.standardMinutes(options.getWindowSlideEvery()))))
-        .apply(new TrackSpeed())
-        .apply(BigQueryIO.Write.to(tableRef)
-            .withSchema(FormatStatsFn.getSchema()));
-
-    // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-    if (!Strings.isNullOrEmpty(options.getInputFile())
-        && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
-      dataflowUtils.runInjectorPipeline(
-          new ReadFileAndExtractTimestamps(options.getInputFile()),
-          options.getPubsubTopic(),
-          PUBSUB_TIMESTAMP_LABEL_KEY);
-    }
-
-    // Run the pipeline.
-    PipelineResult result = pipeline.run();
-
-    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
-  }
-
-  private static Double tryParseAvgSpeed(String[] inputItems) {
-    try {
-      return Double.parseDouble(tryParseString(inputItems, 9));
-    } catch (NumberFormatException e) {
-      return null;
-    } catch (NullPointerException e) {
-      return null;
-    }
-  }
-
-  private static String tryParseStationType(String[] inputItems) {
-    return tryParseString(inputItems, 4);
-  }
-
-  private static String tryParseStationId(String[] inputItems) {
-    return tryParseString(inputItems, 1);
-  }
-
-  private static String tryParseTimestamp(String[] inputItems) {
-    return tryParseString(inputItems, 0);
-  }
-
-  private static String tryParseString(String[] inputItems, int index) {
-    return inputItems.length >= index ? inputItems[index] : null;
-  }
-
-  /**
-   * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
-   */
-  private static Map<String, String> buildStationInfo() {
-    Map<String, String> stations = new Hashtable<String, String>();
-      stations.put("1108413", "SDRoute1"); // from freeway 805 S
-      stations.put("1108699", "SDRoute2"); // from freeway 78 E
-      stations.put("1108702", "SDRoute2");
-    return stations;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
deleted file mode 100644
index 503bcad..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An example that reads the public samples of weather data from BigQuery, counts the number of
- * tornadoes that occur in each month, and writes the results to BigQuery.
- *
- * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
- *
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output, with the form
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
- * and can be overridden with {@code --input}.
- */
-public class BigQueryTornadoes {
-  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
-  private static final String WEATHER_SAMPLES_TABLE =
-      "clouddataflow-readonly:samples.weather_stations";
-
-  /**
-   * Examines each row in the input table. If a tornado was recorded
-   * in that sample, the month in which it occurred is output.
-   */
-  static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      if ((Boolean) row.get("tornado")) {
-        c.output(Integer.parseInt((String) row.get("month")));
-      }
-    }
-  }
-
-  /**
-   * Prepares the data for writing to BigQuery by building a TableRow object containing an
-   * integer representation of month and the number of tornadoes that occurred in each month.
-   */
-  static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = new TableRow()
-          .set("month", c.element().getKey())
-          .set("tornado_count", c.element().getValue());
-      c.output(row);
-    }
-  }
-
-  /**
-   * Takes rows from a table and generates a table of counts.
-   *
-   * <p>The input schema is described by
-   * https://developers.google.com/bigquery/docs/dataset-gsod .
-   * The output contains the total number of tornadoes found in each month in
-   * the following schema:
-   * <ul>
-   *   <li>month: integer</li>
-   *   <li>tornado_count: integer</li>
-   * </ul>
-   */
-  static class CountTornadoes
-      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
-    @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
-      // row... => month...
-      PCollection<Integer> tornadoes = rows.apply(
-          ParDo.of(new ExtractTornadoesFn()));
-
-      // month... => <month,count>...
-      PCollection<KV<Integer, Long>> tornadoCounts =
-          tornadoes.apply(Count.<Integer>perElement());
-
-      // <month,count>... => row...
-      PCollection<TableRow> results = tornadoCounts.apply(
-          ParDo.of(new FormatCountsFn()));
-
-      return results;
-    }
-  }
-
-  /**
-   * Options supported by {@link BigQueryTornadoes}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Table to read from, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Default.String(WEATHER_SAMPLES_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("BigQuery table to write to, specified as "
-        + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args) {
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-    Pipeline p = Pipeline.create(options);
-
-    // Build the table schema for the output table.
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
-    TableSchema schema = new TableSchema().setFields(fields);
-
-    p.apply(BigQueryIO.Read.from(options.getInput()))
-     .apply(new CountTornadoes())
-     .apply(BigQueryIO.Write
-        .to(options.getOutput())
-        .withSchema(schema)
-        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
deleted file mode 100644
index 9540dd4..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An example that reads the public 'Shakespeare' data, and for each word in
- * the dataset that is over a given length, generates a string containing the
- * list of play names in which that word appears, and saves this information
- * to a bigquery table.
- *
- * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a
- * key-grouped Collection, and how to use an Aggregator to track information in the
- * Monitoring UI.
- *
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://<STAGING DIRECTORY>
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>The BigQuery input table defaults to {@code publicdata:samples.shakespeare} and can
- * be overridden with {@code --input}.
- */
-public class CombinePerKeyExamples {
-  // Use the shakespeare public BigQuery sample
-  private static final String SHAKESPEARE_TABLE =
-      "publicdata:samples.shakespeare";
-  // We'll track words >= this word length across all plays in the table.
-  private static final int MIN_WORD_LENGTH = 9;
-
-  /**
-   * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
-   * outputs word, play_name.
-   */
-  static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
-    private final Aggregator<Long, Long> smallerWords =
-        createAggregator("smallerWords", new Sum.SumLongFn());
-
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      String playName = (String) row.get("corpus");
-      String word = (String) row.get("word");
-      if (word.length() >= MIN_WORD_LENGTH) {
-        c.output(KV.of(word, playName));
-      } else {
-        // Track how many smaller words we're not including. This information will be
-        // visible in the Monitoring UI.
-        smallerWords.addValue(1L);
-      }
-    }
-  }
-
-
-  /**
-   * Prepares the data for writing to BigQuery by building a TableRow object
-   * containing a word with a string listing the plays in which it appeared.
-   */
-  static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = new TableRow()
-          .set("word", c.element().getKey())
-          .set("all_plays", c.element().getValue());
-      c.output(row);
-    }
-  }
-
-  /**
-   * Reads the public 'Shakespeare' data, and for each word in the dataset
-   * over a given length, generates a string containing the list of play names
-   * in which that word appears. It does this via the Combine.perKey
-   * transform, with the ConcatWords combine function.
-   *
-   * <p>Combine.perKey is similar to a GroupByKey followed by a ParDo, but
-   * has more restricted semantics that allow it to be executed more
-   * efficiently. These records are then formatted as BQ table rows.
-   */
-  static class PlaysForWord
-      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
-    @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
-      // row... => <word, play_name> ...
-      PCollection<KV<String, String>> words = rows.apply(
-          ParDo.of(new ExtractLargeWordsFn()));
-
-      // word, play_name => word, all_plays ...
-      PCollection<KV<String, String>> wordAllPlays =
-          words.apply(Combine.<String, String>perKey(
-              new ConcatWords()));
-
-      // <word, all_plays>... => row...
-      PCollection<TableRow> results = wordAllPlays.apply(
-          ParDo.of(new FormatShakespeareOutputFn()));
-
-      return results;
-    }
-  }
-
-  /**
-   * A 'combine function' used with the Combine.perKey transform. Builds a
-   * comma-separated string of all input items.  So, it will build a string
-   * containing all the different Shakespeare plays in which the given input
-   * word has appeared.
-   */
-  public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
-    @Override
-    public String apply(Iterable<String> input) {
-      StringBuilder all = new StringBuilder();
-      for (String item : input) {
-        if (!item.isEmpty()) {
-          if (all.length() == 0) {
-            all.append(item);
-          } else {
-            all.append(",");
-            all.append(item);
-          }
-        }
-      }
-      return all.toString();
-    }
-  }
-
-  /**
-   * Options supported by {@link CombinePerKeyExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Table to read from, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Default.String(SHAKESPEARE_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("Table to write to, specified as "
-        + "<project_id>:<dataset_id>.<table_id>. "
-        + "The dataset_id must already exist")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args)
-      throws Exception {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-
-    // Build the table schema for the output table.
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("word").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
-    TableSchema schema = new TableSchema().setFields(fields);
-
-    p.apply(BigQueryIO.Read.from(options.getInput()))
-     .apply(new PlaysForWord())
-     .apply(BigQueryIO.Write
-        .to(options.getOutput())
-        .withSchema(schema)
-        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
deleted file mode 100644
index eaf1e20..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.getString;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Property;
-import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.cloud.dataflow.examples.WordCount;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.DatastoreIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-import java.util.Map;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-/**
- * A WordCount example using DatastoreIO.
- *
- * <p>This example shows how to use DatastoreIO to read from Datastore and
- * write the results to Cloud Storage.  Note that this example will write
- * data to Datastore, which may incur charge for Datastore operations.
- *
- * <p>To run this example, users need to use gcloud to get credential for Datastore:
- * <pre>{@code
- * $ gcloud auth login
- * }</pre>
- *
- * <p>To run this pipeline locally, the following options must be provided:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --dataset=YOUR_DATASET_ID
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
- * }</pre>
- *
- * <p>To run this example using Dataflow service, you must additionally
- * provide either {@literal --stagingLocation} or {@literal --tempLocation}, and
- * select one of the Dataflow pipeline runners, eg
- * {@literal --runner=BlockingDataflowPipelineRunner}.
- *
- * <p><b>Note:</b> this example creates entities with <i>Ancestor keys</i> to ensure that all
- * entities created are in the same entity group. Similarly, the query used to read from the Cloud
- * Datastore uses an <i>Ancestor filter</i>. Ancestors are used to ensure strongly consistent
- * results in Cloud Datastore. For more information, see the Cloud Datastore documentation on
- * <a href="https://cloud.google.com/datastore/docs/concepts/structuring_for_strong_consistency">
- * Structing Data for Strong Consistency</a>.
- */
-public class DatastoreWordCount {
-
-  /**
-   * A DoFn that gets the content of an entity (one line in a
-   * Shakespeare play) and converts it to a string.
-   */
-  static class GetContentFn extends DoFn<Entity, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      Map<String, Value> props = getPropertyMap(c.element());
-      Value value = props.get("content");
-      if (value != null) {
-        c.output(getString(value));
-      }
-    }
-  }
-
-  /**
-   * A helper function to create the ancestor key for all created and queried entities.
-   *
-   * <p>We use ancestor keys and ancestor queries for strong consistency. See
-   * {@link DatastoreWordCount} javadoc for more information.
-   */
-  static Key makeAncestorKey(@Nullable String namespace, String kind) {
-    Key.Builder keyBuilder = makeKey(kind, "root");
-    if (namespace != null) {
-      keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
-    }
-    return keyBuilder.build();
-  }
-
-  /**
-   * A DoFn that creates entity for every line in Shakespeare.
-   */
-  static class CreateEntityFn extends DoFn<String, Entity> {
-    private final String namespace;
-    private final String kind;
-    private final Key ancestorKey;
-
-    CreateEntityFn(String namespace, String kind) {
-      this.namespace = namespace;
-      this.kind = kind;
-
-      // Build the ancestor key for all created entities once, including the namespace.
-      ancestorKey = makeAncestorKey(namespace, kind);
-    }
-
-    public Entity makeEntity(String content) {
-      Entity.Builder entityBuilder = Entity.newBuilder();
-
-      // All created entities have the same ancestor Key.
-      Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
-      // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
-      // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
-      // we can simplify this code.
-      if (namespace != null) {
-        keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
-      }
-
-      entityBuilder.setKey(keyBuilder.build());
-      entityBuilder.addProperty(Property.newBuilder().setName("content")
-          .setValue(Value.newBuilder().setStringValue(content)));
-      return entityBuilder.build();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(makeEntity(c.element()));
-    }
-  }
-
-  /**
-   * Options supported by {@link DatastoreWordCount}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  public static interface Options extends PipelineOptions {
-    @Description("Path of the file to read from and store to Datastore")
-    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
-    String getInput();
-    void setInput(String value);
-
-    @Description("Path of the file to write to")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-
-    @Description("Dataset ID to read from datastore")
-    @Validation.Required
-    String getDataset();
-    void setDataset(String value);
-
-    @Description("Dataset entity kind")
-    @Default.String("shakespeare-demo")
-    String getKind();
-    void setKind(String value);
-
-    @Description("Dataset namespace")
-    String getNamespace();
-    void setNamespace(@Nullable String value);
-
-    @Description("Read an existing dataset, do not write first")
-    boolean isReadOnly();
-    void setReadOnly(boolean value);
-
-    @Description("Number of output shards")
-    @Default.Integer(0) // If the system should choose automatically.
-    int getNumShards();
-    void setNumShards(int value);
-  }
-
-  /**
-   * An example that creates a pipeline to populate DatastoreIO from a
-   * text input.  Forces use of DirectPipelineRunner for local execution mode.
-   */
-  public static void writeDataToDatastore(Options options) {
-      Pipeline p = Pipeline.create(options);
-      p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
-       .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
-       .apply(DatastoreIO.writeTo(options.getDataset()));
-
-      p.run();
-  }
-
-  /**
-   * Build a Cloud Datastore ancestor query for the specified {@link Options#getNamespace} and
-   * {@link Options#getKind}.
-   *
-   * <p>We use ancestor keys and ancestor queries for strong consistency. See
-   * {@link DatastoreWordCount} javadoc for more information.
-   *
-   * @see <a href="https://cloud.google.com/datastore/docs/concepts/queries#Datastore_Ancestor_filters">Ancestor filters</a>
-   */
-  static Query makeAncestorKindQuery(Options options) {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(options.getKind());
-    q.setFilter(makeFilter(
-        "__key__",
-        PropertyFilter.Operator.HAS_ANCESTOR,
-        makeValue(makeAncestorKey(options.getNamespace(), options.getKind()))));
-    return q.build();
-  }
-
-  /**
-   * An example that creates a pipeline to do DatastoreIO.Read from Datastore.
-   */
-  public static void readDataFromDatastore(Options options) {
-    Query query = makeAncestorKindQuery(options);
-
-    // For Datastore sources, the read namespace can be set on the entire query.
-    DatastoreIO.Source source = DatastoreIO.source()
-        .withDataset(options.getDataset())
-        .withQuery(query)
-        .withNamespace(options.getNamespace());
-
-    Pipeline p = Pipeline.create(options);
-    p.apply("ReadShakespeareFromDatastore", Read.from(source))
-        .apply("StringifyEntity", ParDo.of(new GetContentFn()))
-        .apply("CountWords", new WordCount.CountWords())
-        .apply("PrintWordCount", MapElements.via(new WordCount.FormatAsTextFn()))
-        .apply("WriteLines", TextIO.Write.to(options.getOutput())
-            .withNumShards(options.getNumShards()));
-    p.run();
-  }
-
-  /**
-   * An example to demo how to use {@link DatastoreIO}.  The runner here is
-   * customizable, which means users could pass either {@code DirectPipelineRunner}
-   * or {@code DataflowPipelineRunner} in the pipeline options.
-   */
-  public static void main(String args[]) {
-    // The options are used in two places, for Dataflow service, and
-    // building DatastoreIO.Read object
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-    if (!options.isReadOnly()) {
-      // First example: write data to Datastore for reading later.
-      //
-      // NOTE: this write does not delete any existing Entities in the Datastore, so if run
-      // multiple times with the same output dataset, there may be duplicate entries. The
-      // Datastore Query tool in the Google Developers Console can be used to inspect or erase all
-      // entries with a particular namespace and/or kind.
-      DatastoreWordCount.writeDataToDatastore(options);
-    }
-
-    // Second example: do parallel read from Datastore.
-    DatastoreWordCount.readDataFromDatastore(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
deleted file mode 100644
index 9873561..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-
-/**
- * This example uses as input Shakespeare's plays as plaintext files, and will remove any
- * duplicate lines across all the files. (The output does not preserve any input order).
- *
- * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together.
- * Demonstrates {@link com.google.cloud.dataflow.sdk.io.TextIO.Read}/
- * {@link RemoveDuplicates}/{@link com.google.cloud.dataflow.sdk.io.TextIO.Write}.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- *   --project=YOUR_PROJECT_ID
- * and a local output file or output prefix on GCS:
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * and an output prefix on GCS:
- *   --output=gs://YOUR_OUTPUT_PREFIX
- *
- * <p>The input defaults to {@code gs://dataflow-samples/shakespeare/*} and can be
- * overridden with {@code --input}.
- */
-public class DeDupExample {
-
-  /**
-   * Options supported by {@link DeDupExample}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Path to the directory or GCS prefix containing files to read from")
-    @Default.String("gs://dataflow-samples/shakespeare/*")
-    String getInput();
-    void setInput(String value);
-
-    @Description("Path of the file to write to")
-    @Default.InstanceFactory(OutputFactory.class)
-    String getOutput();
-    void setOutput(String value);
-
-    /** Returns gs://${STAGING_LOCATION}/"deduped.txt". */
-    public static class OutputFactory implements DefaultValueFactory<String> {
-      @Override
-      public String create(PipelineOptions options) {
-        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-        if (dataflowOptions.getStagingLocation() != null) {
-          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
-              .resolve("deduped.txt").toString();
-        } else {
-          throw new IllegalArgumentException("Must specify --output or --stagingLocation");
-        }
-      }
-    }
-  }
-
-
-  public static void main(String[] args)
-      throws Exception {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
-     .apply(RemoveDuplicates.<String>create())
-     .apply(TextIO.Write.named("DedupedShakespeare")
-         .to(options.getOutput()));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
deleted file mode 100644
index 781873a..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Mean;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * This is an example that demonstrates several approaches to filtering, and use of the Mean
- * transform. It shows how to dynamically set parameters by defining and using new pipeline options,
- * and how to use a value derived by the pipeline.
- *
- * <p>Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side
- * input; approaches to filtering, selection, and projection.
- *
- * <p>The example reads public samples of weather data from BigQuery. It performs a
- * projection on the data, finds the global mean of the temperature readings, filters on readings
- * for a single given month, and then outputs only data (for that month) that has a mean temp
- * smaller than the derived global mean.
-*
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- *   [--monthFilter=<month_number>]
- * }
- * </pre>
- * where optional parameter {@code --monthFilter} is set to a number 1-12.
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- *   [--monthFilter=<month_number>]
- * }
- * </pre>
- * where optional parameter {@code --monthFilter} is set to a number 1-12.
- *
- * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
- * and can be overridden with {@code --input}.
- */
-public class FilterExamples {
-  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
-  private static final String WEATHER_SAMPLES_TABLE =
-      "clouddataflow-readonly:samples.weather_stations";
-  static final Logger LOG = Logger.getLogger(FilterExamples.class.getName());
-  static final int MONTH_TO_FILTER = 7;
-
-  /**
-   * Examines each row in the input table. Outputs only the subset of the cells this example
-   * is interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
-   */
-  static class ProjectionFn extends DoFn<TableRow, TableRow> {
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      // Grab year, month, day, mean_temp from the row
-      Integer year = Integer.parseInt((String) row.get("year"));
-      Integer month = Integer.parseInt((String) row.get("month"));
-      Integer day = Integer.parseInt((String) row.get("day"));
-      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      // Prepares the data for writing to BigQuery by building a TableRow object
-      TableRow outRow = new TableRow()
-          .set("year", year).set("month", month)
-          .set("day", day).set("mean_temp", meanTemp);
-      c.output(outRow);
-    }
-  }
-
-  /**
-   * Implements 'filter' functionality.
-   *
-   * <p>Examines each row in the input table. Outputs only rows from the month
-   * monthFilter, which is passed in as a parameter during construction of this DoFn.
-   */
-  static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
-    Integer monthFilter;
-
-    public FilterSingleMonthDataFn(Integer monthFilter) {
-      this.monthFilter = monthFilter;
-    }
-
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      Integer month;
-      month = (Integer) row.get("month");
-      if (month.equals(this.monthFilter)) {
-        c.output(row);
-      }
-    }
-  }
-
-  /**
-   * Examines each row (weather reading) in the input table. Output the temperature
-   * reading for that row ('mean_temp').
-   */
-  static class ExtractTempFn extends DoFn<TableRow, Double> {
-    @Override
-    public void processElement(ProcessContext c){
-      TableRow row = c.element();
-      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      c.output(meanTemp);
-    }
-  }
-
-
-
-  /*
-   * Finds the global mean of the mean_temp for each day/record, and outputs
-   * only data that has a mean temp larger than this global mean.
-   **/
-  static class BelowGlobalMean
-      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
-    Integer monthFilter;
-
-    public BelowGlobalMean(Integer monthFilter) {
-      this.monthFilter = monthFilter;
-    }
-
-
-    @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
-      // Extract the mean_temp from each row.
-      PCollection<Double> meanTemps = rows.apply(
-          ParDo.of(new ExtractTempFn()));
-
-      // Find the global mean, of all the mean_temp readings in the weather data,
-      // and prepare this singleton PCollectionView for use as a side input.
-      final PCollectionView<Double> globalMeanTemp =
-          meanTemps.apply(Mean.<Double>globally())
-               .apply(View.<Double>asSingleton());
-
-      // Rows filtered to remove all but a single month
-      PCollection<TableRow> monthFilteredRows = rows
-          .apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));
-
-      // Then, use the global mean as a side input, to further filter the weather data.
-      // By using a side input to pass in the filtering criteria, we can use a value
-      // that is computed earlier in pipeline execution.
-      // We'll only output readings with temperatures below this mean.
-      PCollection<TableRow> filteredRows = monthFilteredRows
-          .apply(ParDo
-              .named("ParseAndFilter")
-              .withSideInputs(globalMeanTemp)
-              .of(new DoFn<TableRow, TableRow>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
-                  Double gTemp = c.sideInput(globalMeanTemp);
-                  if (meanTemp < gTemp) {
-                    c.output(c.element());
-                  }
-                }
-              }));
-
-      return filteredRows;
-    }
-  }
-
-
-  /**
-   * Options supported by {@link FilterExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Table to read from, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Default.String(WEATHER_SAMPLES_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("Table to write to, specified as "
-        + "<project_id>:<dataset_id>.<table_id>. "
-        + "The dataset_id must already exist")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-
-    @Description("Numeric value of month to filter on")
-    @Default.Integer(MONTH_TO_FILTER)
-    Integer getMonthFilter();
-    void setMonthFilter(Integer value);
-  }
-
-  /**
-   * Helper method to build the table schema for the output table.
-   */
-  private static TableSchema buildWeatherSchemaProjection() {
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT"));
-    TableSchema schema = new TableSchema().setFields(fields);
-    return schema;
-  }
-
-  public static void main(String[] args)
-      throws Exception {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-
-    TableSchema schema = buildWeatherSchemaProjection();
-
-    p.apply(BigQueryIO.Read.from(options.getInput()))
-     .apply(ParDo.of(new ProjectionFn()))
-     .apply(new BelowGlobalMean(options.getMonthFilter()))
-     .apply(BigQueryIO.Write
-        .to(options.getOutput())
-        .withSchema(schema)
-        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
deleted file mode 100644
index 745c5d6..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-/**
- * This example shows how to do a join on two collections.
- * It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), joining the event
- * 'action' country code against a table that maps country codes to country names.
- *
- * <p>Concepts: Join operation; multiple input sources.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- */
-public class JoinExamples {
-
-  // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
-  private static final String GDELT_EVENTS_TABLE =
-      "clouddataflow-readonly:samples.gdelt_sample";
-  // A table that maps country codes to country names.
-  private static final String COUNTRY_CODES =
-      "gdelt-bq:full.crosswalk_geocountrycodetohuman";
-
-  /**
-   * Join two collections, using country code as the key.
-   */
-  static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
-      PCollection<TableRow> countryCodes) throws Exception {
-
-    final TupleTag<String> eventInfoTag = new TupleTag<String>();
-    final TupleTag<String> countryInfoTag = new TupleTag<String>();
-
-    // transform both input collections to tuple collections, where the keys are country
-    // codes in both cases.
-    PCollection<KV<String, String>> eventInfo = eventsTable.apply(
-        ParDo.of(new ExtractEventDataFn()));
-    PCollection<KV<String, String>> countryInfo = countryCodes.apply(
-        ParDo.of(new ExtractCountryInfoFn()));
-
-    // country code 'key' -> CGBKR (<event info>, <country name>)
-    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
-        .of(eventInfoTag, eventInfo)
-        .and(countryInfoTag, countryInfo)
-        .apply(CoGroupByKey.<String>create());
-
-    // Process the CoGbkResult elements generated by the CoGroupByKey transform.
-    // country code 'key' -> string of <event info>, <country name>
-    PCollection<KV<String, String>> finalResultCollection =
-      kvpCollection.apply(ParDo.named("Process").of(
-        new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            KV<String, CoGbkResult> e = c.element();
-            String countryCode = e.getKey();
-            String countryName = "none";
-            countryName = e.getValue().getOnly(countryInfoTag);
-            for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
-              // Generate a string that combines information from both collection values
-              c.output(KV.of(countryCode, "Country name: " + countryName
-                      + ", Event info: " + eventInfo));
-            }
-          }
-      }));
-
-    // write to GCS
-    PCollection<String> formattedResults = finalResultCollection
-        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            String outputstring = "Country code: " + c.element().getKey()
-                + ", " + c.element().getValue();
-            c.output(outputstring);
-          }
-        }));
-    return formattedResults;
-  }
-
-  /**
-   * Examines each row (event) in the input table. Output a KV with the key the country
-   * code of the event, and the value a string encoding event information.
-   */
-  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      String countryCode = (String) row.get("ActionGeo_CountryCode");
-      String sqlDate = (String) row.get("SQLDATE");
-      String actor1Name = (String) row.get("Actor1Name");
-      String sourceUrl = (String) row.get("SOURCEURL");
-      String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
-      c.output(KV.of(countryCode, eventInfo));
-    }
-  }
-
-
-  /**
-   * Examines each row (country info) in the input table. Output a KV with the key the country
-   * code, and the value the country name.
-   */
-  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      String countryCode = (String) row.get("FIPSCC");
-      String countryName = (String) row.get("HumanName");
-      c.output(KV.of(countryCode, countryName));
-    }
-  }
-
-
-  /**
-   * Options supported by {@link JoinExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Path of the file to write to")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args) throws Exception {
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-    // the following two 'applys' create multiple inputs to our pipeline, one for each
-    // of our two input sources.
-    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
-    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
-    PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
-    formattedResults.apply(TextIO.Write.to(options.getOutput()));
-    p.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
deleted file mode 100644
index 1c26d0f..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An example that reads the public samples of weather data from BigQuery, and finds
- * the maximum temperature ('mean_temp') for each month.
- *
- * <p>Concepts: The 'Max' statistical combination function, and how to find the max per
- * key group.
- *
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output, with the form
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations }
- * and can be overridden with {@code --input}.
- */
-public class MaxPerKeyExamples {
-  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
-  private static final String WEATHER_SAMPLES_TABLE =
-      "clouddataflow-readonly:samples.weather_stations";
-
-  /**
-   * Examines each row (weather reading) in the input table. Output the month of the reading,
-   * and the mean_temp.
-   */
-  static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      Integer month = Integer.parseInt((String) row.get("month"));
-      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      c.output(KV.of(month, meanTemp));
-    }
-  }
-
-  /**
-   * Format the results to a TableRow, to save to BigQuery.
-   *
-   */
-  static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = new TableRow()
-          .set("month", c.element().getKey())
-          .set("max_mean_temp", c.element().getValue());
-      c.output(row);
-    }
-  }
-
-  /**
-   * Reads rows from a weather data table, and finds the max mean_temp for each
-   * month via the 'Max' statistical combination function.
-   */
-  static class MaxMeanTemp
-      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
-    @Override
-    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
-      // row... => <month, mean_temp> ...
-      PCollection<KV<Integer, Double>> temps = rows.apply(
-          ParDo.of(new ExtractTempFn()));
-
-      // month, mean_temp... => <month, max mean temp>...
-      PCollection<KV<Integer, Double>> tempMaxes =
-          temps.apply(Max.<Integer>doublesPerKey());
-
-      // <month, max>... => row...
-      PCollection<TableRow> results = tempMaxes.apply(
-          ParDo.of(new FormatMaxesFn()));
-
-      return results;
-    }
-  }
-
-  /**
-   * Options supported by {@link MaxPerKeyExamples}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Table to read from, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Default.String(WEATHER_SAMPLES_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("Table to write to, specified as "
-        + "<project_id>:<dataset_id>.<table_id>")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args)
-      throws Exception {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-
-    // Build the table schema for the output table.
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
-    fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
-    TableSchema schema = new TableSchema().setFields(fields);
-
-    p.apply(BigQueryIO.Read.from(options.getInput()))
-     .apply(new MaxMeanTemp())
-     .apply(BigQueryIO.Write
-        .to(options.getOutput())
-        .withSchema(schema)
-        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md b/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
deleted file mode 100644
index 99f3080..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
+++ /dev/null
@@ -1,55 +0,0 @@
-
-# "Cookbook" Examples
-
-This directory holds simple "cookbook" examples, which show how to define
-commonly-used data analysis patterns that you would likely incorporate into a
-larger Dataflow pipeline. They include:
-
- <ul>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java">BigQueryTornadoes</a>
-  &mdash; An example that reads the public samples of weather data from Google
-  BigQuery, counts the number of tornadoes that occur in each month, and
-  writes the results to BigQuery. Demonstrates reading/writing BigQuery,
-  counting a <code>PCollection</code>, and user-defined <code>PTransforms</code>.</li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java">CombinePerKeyExamples</a>
-  &mdash; An example that reads the public &quot;Shakespeare&quot; data, and for
-  each word in the dataset that exceeds a given length, generates a string
-  containing the list of play names in which that word appears.
-  Demonstrates the <code>Combine.perKey</code>
-  transform, which lets you combine the values in a key-grouped
-  <code>PCollection</code>.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java">DatastoreWordCount</a>
-  &mdash; An example that shows you how to read from Google Cloud Datastore.</li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a>
-  &mdash; An example that uses Shakespeare's plays as plain text files, and
-  removes duplicate lines across all the files. Demonstrates the
-  <code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
-  and <code>TextIO.Write</code> transforms, and how to wire transforms together.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a>
-  &mdash; An example that shows different approaches to filtering, including
-  selection and projection. It also shows how to dynamically set parameters
-  by defining and using new pipeline options, and use how to use a value derived
-  by a pipeline. Demonstrates the <code>Mean</code> transform,
-  <code>Options</code> configuration, and using pipeline-derived data as a side
-  input.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java">JoinExamples</a>
-  &mdash; An example that shows how to join two collections. It uses a
-  sample of the <a href="http://goo.gl/OB6oin">GDELT &quot;world event&quot;
-  data</a>, joining the event <code>action</code> country code against a table
-  that maps country codes to country names. Demonstrates the <code>Join</code>
-  operation, and using multiple input sources.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java">MaxPerKeyExamples</a>
-  &mdash; An example that reads the public samples of weather data from BigQuery,
-  and finds the maximum temperature (<code>mean_temp</code>) for each month.
-  Demonstrates the <code>Max</code> statistical combination transform, and how to
-  find the max-per-key group.
-  </li>
-  </ul>
-
-See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples
-README](../../../../../../../../../README.md) for
-information about how to run these examples.


Mime
View raw message