beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [2/6] incubator-beam git commit: Update Beam examples archetypes
Date Wed, 14 Sep 2016 01:08:58 GMT
Update Beam examples archetypes


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9a66e4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9a66e4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9a66e4b

Branch: refs/heads/master
Commit: b9a66e4b50ae1fe5fa3afc33b2523e2f9d64b2c4
Parents: e3768f6
Author: Pei He <peihe@google.com>
Authored: Thu Sep 8 19:16:12 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue Sep 13 18:01:08 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/DebuggingWordCount.java       |  31 +-
 .../src/main/java/MinimalWordCount.java         |  51 ++-
 .../src/main/java/WindowedWordCount.java        | 139 +++----
 .../src/main/java/WordCount.java                |  77 ++--
 .../java/common/DataflowExampleOptions.java     |  32 --
 .../main/java/common/DataflowExampleUtils.java  | 391 -------------------
 .../common/ExampleBigQueryTableOptions.java     |  11 +-
 .../src/main/java/common/ExampleOptions.java    |  37 ++
 ...xamplePubsubTopicAndSubscriptionOptions.java |  45 +++
 .../java/common/ExamplePubsubTopicOptions.java  |  17 +-
 .../src/main/java/common/ExampleUtils.java      | 353 +++++++++++++++++
 .../main/java/common/PubsubFileInjector.java    |   8 +-
 .../src/test/java/WordCountTest.java            |   9 +-
 13 files changed, 592 insertions(+), 609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index e9f4333..e315ba9 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
@@ -36,8 +36,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
- * An example that verifies word counts in Shakespeare and includes Dataflow best practices.
+ * An example that verifies word counts in Shakespeare and includes Beam best practices.
  *
  * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more
  * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
@@ -46,12 +47,12 @@ import org.slf4j.LoggerFactory;
  *
  * <p>Basic concepts, also in the MinimalWordCount and WordCount examples:
  * Reading text files; counting a PCollection; executing a Pipeline both locally
- * and using the Dataflow service; defining DoFns.
+ * and using a selected runner; defining DoFns.
  *
  * <p>New Concepts:
  * <pre>
  *   1. Logging to Cloud Logging
- *   2. Controlling Dataflow worker log levels
+ *   2. Controlling worker log levels
  *   3. Creating a custom aggregator
  *   4. Testing your Pipeline via PAssert
  * </pre>
@@ -62,12 +63,14 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  *
- * <p>To execute this pipeline using the Dataflow service and the additional logging discussed
- * below, specify pipeline configuration:
+ * <p>To change the runner, specify:
+ * <pre>{@code
+ *   --runner=YOUR_SELECTED_RUNNER
+ * }
+ * </pre>
+ *
+ * <p>To use the additional logging discussed below, specify:
  * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowRunner
  *   --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
  * }
  * </pre>
@@ -100,12 +103,12 @@ import org.slf4j.LoggerFactory;
  * that changing the default worker log level to TRACE or DEBUG will significantly increase
  * the amount of logs output.
  *
- * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
+ * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
+ * and can be overridden with {@code --inputFile}.
  */
 public class DebuggingWordCount {
   /** A DoFn that filters for a specific key based upon a regular expression. */
-  public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> {
+  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
     /**
      * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
      * as the logger. All log statements emitted by this logger will be referenced by this name
@@ -131,7 +134,7 @@ public class DebuggingWordCount {
     private final Aggregator<Long, Long> unmatchedWords =
         createAggregator("umatchedWords", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (filter.matcher(c.element().getKey()).matches()) {
         // Log at the "DEBUG" level each element that we match. When executing this pipeline
@@ -149,7 +152,7 @@ public class DebuggingWordCount {
       }
     }
   }
-  
+
   /**
    * Options supported by {@link DebuggingWordCount}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index 55beb1f..f739fd8 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -17,14 +17,15 @@
  */
 package ${package};
 
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.KV;
 
 
@@ -48,26 +49,34 @@ import org.apache.beam.sdk.values.KV;
  *   4. Writing data to Cloud Storage as text files
  * </pre>
  *
- * <p>To execute this pipeline, first edit the code to set your project ID, the staging
+ * <p>To execute this pipeline, first edit the code to set your project ID, the temp
  * location, and the output location. The specified GCS bucket(s) must already exist.
  *
- * <p>Then, run the pipeline as described in the README. It will be deployed and run using the
- * Dataflow service. No args are required to run the pipeline. You can see the results in your
+ * <p>Then, run the pipeline as described in the README. It will be deployed and run with the
+ * selected runner. No args are required to run the pipeline. You can see the results in your
  * output bucket in the GCS browser.
  */
 public class MinimalWordCount {
 
   public static void main(String[] args) {
-    // Create a DataflowPipelineOptions object. This object lets us set various execution
+    // Create a PipelineOptions object. This object lets us set various execution
     // options for our pipeline, such as the associated Cloud Platform project and the location
     // in Google Cloud Storage to stage files.
-    DataflowPipelineOptions options = PipelineOptionsFactory.create()
-      .as(DataflowPipelineOptions.class);
-    options.setRunner(BlockingDataflowRunner.class);
-    // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
-    options.setProject("SET_YOUR_PROJECT_ID_HERE");
-    // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
-    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    // In order to run your pipeline, you need to make following runner specific changes:
+    //
+    // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner.
+    // CHANGE 2/3: Specify runner-required options.
+    // For DataflowRunner, set project and temp location as follows:
+    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+    //   dataflowOptions.setRunner(DataflowRunner.class);
+    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
+    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
+    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
+    // for more details.
+    //   options.as(FlinkPipelineOptions.class)
+    //      .setRunner(FlinkRunner.class);
 
     // Create the Pipeline object with the options we defined above.
     Pipeline p = Pipeline.create(options);
@@ -77,13 +86,13 @@ public class MinimalWordCount {
     // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
     // of input text files. TextIO.Read returns a PCollection where each element is one line from
     // the input text (a set of Shakespeare's texts).
-    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
      // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
      // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
      // The ParDo returns a PCollection<String>, where each element is an individual word in
      // Shakespeare's collected texts.
-     .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() {
-                       @Override
+     .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
+                       @ProcessElement
                        public void processElement(ProcessContext c) {
                          for (String word : c.element().split("[^a-zA-Z']+")) {
                            if (!word.isEmpty()) {
@@ -96,12 +105,12 @@ public class MinimalWordCount {
      // transform returns a new PCollection of key/value pairs, where each key represents a unique
      // word in the text. The associated value is the occurrence count for that word.
      .apply(Count.<String>perElement())
-     // Apply another ParDo transform that formats our PCollection of word counts into a printable
+     // Apply a MapElements transform that formats our PCollection of word counts into a printable
      // string, suitable for writing to an output file.
-     .apply("FormatResults", ParDo.of(new OldDoFn<KV<String, Long>, String>() {
+     .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                        @Override
-                       public void processElement(ProcessContext c) {
-                         c.output(c.element().getKey() + ": " + c.element().getValue());
+                       public String apply(KV<String, Long> input) {
+                         return input.getKey() + ": " + input.getValue();
                        }
                      }))
      // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
index 17bf7ca..787e8c9 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -17,23 +17,24 @@
  */
 package ${package};
 
+import ${package}.common.ExampleBigQueryTableOptions;
+import ${package}.common.ExampleOptions;
+import ${package}.common.ExampleUtils;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import ${package}.common.DataflowExampleUtils;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -41,8 +42,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 
 /**
  * An example that counts words in text, and can run over either unbounded or bounded input
@@ -54,58 +54,43 @@ import org.slf4j.LoggerFactory;
  *
  * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
  * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
- * and using the Dataflow service; defining DoFns; creating a custom aggregator;
+ * and using a selected runner; defining DoFns; creating a custom aggregator;
  * user-defined PTransforms; defining PipelineOptions.
  *
  * <p>New Concepts:
  * <pre>
  *   1. Unbounded and bounded pipeline input modes
  *   2. Adding timestamps to data
- *   3. PubSub topics as sources
- *   4. Windowing
- *   5. Re-using PTransforms over windowed PCollections
- *   6. Writing to BigQuery
- * </pre>
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
+ *   3. Windowing
+ *   4. Re-using PTransforms over windowed PCollections
+ *   5. Writing to BigQuery
  * </pre>
  *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <p>By default, the examples will run with the {@code DirectRunner}.
+ * To change the runner, specify:
  * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowRunner
+ *   --runner=YOUR_SELECTED_RUNNER
  * }
  * </pre>
+ * See examples/java/README.md for instructions about how to configure different runners.
  *
  * <p>Optionally specify the input file path via:
  * {@code --inputFile=gs://INPUT_PATH},
- * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}.
+ * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}.
  *
  * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
  * specify the table, one will be created for you using the job name. If you don't specify the
- * dataset, a dataset called {@code dataflow-examples} must already exist in your project.
+ * dataset, a dataset called {@code beam_examples} must already exist in your project.
  * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
  *
- * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or
- * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set
- * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from
- * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not
- * exist, the pipeline will create one for you. It will delete this topic when it terminates.
- * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub
- * topic with the contents of the {@code --inputFile}, in order to make the example easy to run.
- * If you want to use an independently-populated PubSub topic, indicate this by setting
- * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started.
- *
  * <p>By default, the pipeline will do fixed windowing, on 1-minute windows.  You can
  * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
  * for 10-minute windows.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
  */
 public class WindowedWordCount {
-    private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
     static final int WINDOW_SIZE = 1;  // Default window duration in minutes
 
   /**
@@ -116,14 +101,19 @@ public class WindowedWordCount {
    * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
    * 2-hour period.
    */
-  static class AddTimestampFn extends OldDoFn<String, String> {
-    private static final long RAND_RANGE = 7200000; // 2 hours in ms
+  static class AddTimestampFn extends DoFn<String, String> {
+    private static final Duration RAND_RANGE = Duration.standardHours(2);
+    private final Instant minTimestamp;
 
-    @Override
+    AddTimestampFn() {
+      this.minTimestamp = new Instant(System.currentTimeMillis());
+    }
+
+    @ProcessElement
     public void processElement(ProcessContext c) {
       // Generate a timestamp that falls somewhere in the past two hours.
-      long randomTimestamp = System.currentTimeMillis()
-        - (int) (Math.random() * RAND_RANGE);
+      long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
+      Instant randomTimestamp = minTimestamp.plus(randMillis);
       /**
        * Concept #2: Set the data element with that timestamp.
        */
@@ -132,8 +122,8 @@ public class WindowedWordCount {
   }
 
   /** A DoFn that converts a Word and Count into a BigQuery table row. */
-  static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> {
-    @Override
+  static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = new TableRow()
           .set("word", c.element().getKey())
@@ -157,7 +147,7 @@ public class WindowedWordCount {
   }
 
   /**
-   * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one
+   * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one
    * that supports both bounded and unbounded data. This is a helper method that creates a
    * TableReference from input options, to tell the pipeline where to write its BigQuery results.
    */
@@ -173,56 +163,39 @@ public class WindowedWordCount {
    * Options supported by {@link WindowedWordCount}.
    *
    * <p>Inherits standard example configuration options, which allow specification of the BigQuery
-   * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for
+   * table, as well as the {@link WordCount.WordCountOptions} support for
    * specification of the input file.
    */
-  public static interface Options
-        extends WordCount.WordCountOptions, DataflowExampleUtils.DataflowExampleUtilsOptions {
+  public static interface Options extends WordCount.WordCountOptions,
+      ExampleOptions, ExampleBigQueryTableOptions {
     @Description("Fixed window duration, in minutes")
     @Default.Integer(WINDOW_SIZE)
     Integer getWindowSize();
     void setWindowSize(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
   }
 
   public static void main(String[] args) throws IOException {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     options.setBigQuerySchema(getSchema());
-    // DataflowExampleUtils creates the necessary input sources to simplify execution of this
-    // Pipeline.
-    DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
-      options.isUnbounded());
+    // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline.
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);
 
     /**
-     * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
+     * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
      * unbounded input source.
      */
-    PCollection<String> input;
-    if (options.isUnbounded()) {
-      LOG.info("Reading from PubSub.");
-      /**
-       * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
-       * specified as an argument. The data elements' timestamps will come from the pubsub
-       * injection.
-       */
-      input = pipeline
-          .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
-    } else {
-      /** Else, this is a bounded pipeline. Read from the GCS file. */
-      input = pipeline
-          .apply(TextIO.Read.from(options.getInputFile()))
-          // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
-          // See AddTimestampFn for more detail on this.
-          .apply(ParDo.of(new AddTimestampFn()));
-    }
+    PCollection<String> input = pipeline
+      /** Read from the GCS file. */
+      .apply(TextIO.Read.from(options.getInputFile()))
+      // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
+      // See AddTimestampFn for more detail on this.
+      .apply(ParDo.of(new AddTimestampFn()));
 
     /**
-     * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
+     * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
      * minute (you can change this with a command-line option). See the documentation for more
      * information on how fixed windows work, and for information on the other types of windowing
      * available (e.g., sliding windows).
@@ -232,29 +205,25 @@ public class WindowedWordCount {
         FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
 
     /**
-     * Concept #5: Re-use our existing CountWords transform that does not have knowledge of
+     * Concept #4: Re-use our existing CountWords transform that does not have knowledge of
      * windows over a PCollection containing windowed values.
      */
     PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
 
     /**
-     * Concept #6: Format the results for a BigQuery table, then write to BigQuery.
+     * Concept #5: Format the results for a BigQuery table, then write to BigQuery.
      * The BigQuery output source supports both bounded and unbounded data.
      */
     wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
-        .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
+        .apply(BigQueryIO.Write
+          .to(getTableReference(options))
+          .withSchema(getSchema())
+          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
 
     PipelineResult result = pipeline.run();
 
-    /**
-     * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
-     * runs for a limited time, and publishes to the input PubSub topic.
-     *
-     * With an unbounded input source, you will need to explicitly shut down this pipeline when you
-     * are done with it, so that you do not continue to be charged for the instances. You can do
-     * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
-     * pipelines. The PubSub topic will also be deleted at this time.
-     */
-    exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
+    // ExampleUtils will try to cancel the pipeline before the program exists.
+    exampleUtils.waitToFinish(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 5432036..b096d8d 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -17,7 +17,8 @@
  */
 package ${package};
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import com.google.common.base.Strings;
+import java.io.IOException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
@@ -27,17 +28,19 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
-
 /**
- * An example that counts words in Shakespeare and includes Dataflow best practices.
+ * An example that counts words in Shakespeare and includes Beam best practices.
  *
  * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
  * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}.
@@ -45,8 +48,8 @@ import org.apache.beam.sdk.values.PCollection;
  * pipeline, for introduction of additional concepts.
  *
  * <p>For a detailed walkthrough of this example, see
- *   <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example">
- *   https://cloud.google.com/dataflow/java-sdk/wordcount-example
+ *   <a href="http://beam.incubator.apache.org/use/walkthroughs/">
+ *   http://beam.incubator.apache.org/use/walkthroughs/
  *   </a>
  *
  * <p>Basic concepts, also in the MinimalWordCount example:
@@ -54,39 +57,29 @@ import org.apache.beam.sdk.values.PCollection;
  *
  * <p>New Concepts:
  * <pre>
- *   1. Executing a Pipeline both locally and using the Dataflow service
+ *   1. Executing a Pipeline both locally and using the selected runner
  *   2. Using ParDo with static DoFns defined out-of-line
  *   3. Building a composite transform
  *   4. Defining your own pipeline options
  * </pre>
  *
- * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service.
+ * <p>Concept #1: you can execute this pipeline either locally or using the selected runner.
  * These are now command-line options and not hard-coded as they were in the MinimalWordCount
  * example.
- * To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and a local output file or output prefix on GCS:
+ * To execute this pipeline locally, specify a local output file or output prefix on GCS:
  * <pre>{@code
  *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
  * }</pre>
  *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <p>To change the runner, specify:
  * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowRunner
+ *   --runner=YOUR_SELECTED_RUNNER
  * }
  * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
+ * See examples/java/README.md for instructions about how to configure different runners.
  *
- * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
+ * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
+ * and can be overridden with {@code --inputFile}.
  */
 public class WordCount {
 
@@ -95,11 +88,11 @@ public class WordCount {
    * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
    * pipeline.
    */
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);
@@ -117,11 +110,11 @@ public class WordCount {
     }
   }
 
-  /** A DoFn that converts a Word and Count into a printable string. */
-  public static class FormatAsTextFn extends OldDoFn<KV<String, Long>, String> {
+  /** A SimpleFunction that converts a Word and Count into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
     @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element().getKey() + ": " + c.element().getValue());
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
     }
   }
 
@@ -161,7 +154,7 @@ public class WordCount {
    */
   public static interface WordCountOptions extends PipelineOptions {
     @Description("Path of the file to read from")
-    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+    @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
     String getInputFile();
     void setInputFile(String value);
 
@@ -171,21 +164,25 @@ public class WordCount {
     void setOutput(String value);
 
     /**
-     * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination.
+     * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination.
      */
     public static class OutputFactory implements DefaultValueFactory<String> {
       @Override
       public String create(PipelineOptions options) {
-        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-        if (dataflowOptions.getStagingLocation() != null) {
-          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
-              .resolve("counts.txt").toString();
+        String tempLocation = options.getTempLocation();
+        if (!Strings.isNullOrEmpty(tempLocation)) {
+          try {
+            IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+            return factory.resolve(tempLocation, "counts.txt");
+          } catch (IOException e) {
+            throw new RuntimeException(
+                String.format("Failed to resolve temp location: %s", tempLocation));
+          }
         } else {
-          throw new IllegalArgumentException("Must specify --output or --stagingLocation");
+          throw new IllegalArgumentException("Must specify --output or --tempLocation");
         }
       }
     }
-
   }
 
   public static void main(String[] args) {
@@ -197,7 +194,7 @@ public class WordCount {
     // static FormatAsTextFn() to the ParDo transform.
     p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
      .apply(new CountWords())
-     .apply(ParDo.of(new FormatAsTextFn()))
+     .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
deleted file mode 100644
index e3bf7c5..0000000
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options that can be used to configure the Dataflow examples.
- */
-public interface DataflowExampleOptions extends DataflowPipelineOptions {
-  @Description("Whether to keep jobs running on the Dataflow service after local process exit")
-  @Default.Boolean(false)
-  boolean getKeepJobsRunning();
-  void setKeepJobsRunning(boolean keepJobsRunning);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
deleted file mode 100644
index 9e6be78..0000000
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Datasets;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.transforms.IntraBundleParallelization;
-import org.apache.beam.sdk.util.Transport;
-
-/**
- * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
- * injector, and cancels the streaming and the injector pipelines once the program terminates.
- *
- * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
- */
-public class DataflowExampleUtils {
-
-  private final DataflowPipelineOptions options;
-  private Bigquery bigQueryClient = null;
-  private Pubsub pubsubClient = null;
-  private Dataflow dataflowClient = null;
-  private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
-  private List<String> pendingMessages = Lists.newArrayList();
-
-  /**
-   * Define an interface that supports the PubSub and BigQuery example options.
-   */
-  public static interface DataflowExampleUtilsOptions
-        extends DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
-  }
-
-  public DataflowExampleUtils(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
-  /**
-   * Do resources and runner options setup.
-   */
-  public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
-      throws IOException {
-    this.options = options;
-    setupResourcesAndRunner(isUnbounded);
-  }
-
-  /**
-   * Sets up external resources that are required by the example,
-   * such as Pub/Sub topics and BigQuery tables.
-   *
-   * @throws IOException if there is a problem setting up the resources
-   */
-  public void setup() throws IOException {
-    setupPubsubTopic();
-    setupBigQueryTable();
-  }
-
-  /**
-   * Set up external resources, and configure the runner appropriately.
-   */
-  public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
-    if (isUnbounded) {
-      options.setStreaming(true);
-    }
-    setup();
-    setupRunner();
-  }
-
-  /**
-   * Sets up the Google Cloud Pub/Sub topic.
-   *
-   * <p>If the topic doesn't exist, a new topic with the given name will be created.
-   *
-   * @throws IOException if there is a problem setting up the Pub/Sub topic
-   */
-  public void setupPubsubTopic() throws IOException {
-    ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
-    if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) {
-      pendingMessages.add("*******************Set Up Pubsub Topic*********************");
-      setupPubsubTopic(pubsubTopicOptions.getPubsubTopic());
-      pendingMessages.add("The Pub/Sub topic has been set up for this example: "
-          + pubsubTopicOptions.getPubsubTopic());
-    }
-  }
-
-  /**
-   * Sets up the BigQuery table with the given schema.
-   *
-   * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
-   * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
-   * will be created.
-   *
-   * @throws IOException if there is a problem setting up the BigQuery table
-   */
-  public void setupBigQueryTable() throws IOException {
-    ExampleBigQueryTableOptions bigQueryTableOptions =
-        options.as(ExampleBigQueryTableOptions.class);
-    if (bigQueryTableOptions.getBigQueryDataset() != null
-        && bigQueryTableOptions.getBigQueryTable() != null
-        && bigQueryTableOptions.getBigQuerySchema() != null) {
-      pendingMessages.add("******************Set Up Big Query Table*******************");
-      setupBigQueryTable(bigQueryTableOptions.getProject(),
-                         bigQueryTableOptions.getBigQueryDataset(),
-                         bigQueryTableOptions.getBigQueryTable(),
-                         bigQueryTableOptions.getBigQuerySchema());
-      pendingMessages.add("The BigQuery table has been set up for this example: "
-          + bigQueryTableOptions.getProject()
-          + ":" + bigQueryTableOptions.getBigQueryDataset()
-          + "." + bigQueryTableOptions.getBigQueryTable());
-    }
-  }
-
-  /**
-   * Tears down external resources that can be deleted upon the example's completion.
-   */
-  private void tearDown() {
-    pendingMessages.add("*************************Tear Down*************************");
-    ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
-    if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) {
-      try {
-        deletePubsubTopic(pubsubTopicOptions.getPubsubTopic());
-        pendingMessages.add("The Pub/Sub topic has been deleted: "
-            + pubsubTopicOptions.getPubsubTopic());
-      } catch (IOException e) {
-        pendingMessages.add("Failed to delete the Pub/Sub topic : "
-            + pubsubTopicOptions.getPubsubTopic());
-      }
-    }
-
-    ExampleBigQueryTableOptions bigQueryTableOptions =
-        options.as(ExampleBigQueryTableOptions.class);
-    if (bigQueryTableOptions.getBigQueryDataset() != null
-        && bigQueryTableOptions.getBigQueryTable() != null
-        && bigQueryTableOptions.getBigQuerySchema() != null) {
-      pendingMessages.add("The BigQuery table might contain the example's output, "
-          + "and it is not deleted automatically: "
-          + bigQueryTableOptions.getProject()
-          + ":" + bigQueryTableOptions.getBigQueryDataset()
-          + "." + bigQueryTableOptions.getBigQueryTable());
-      pendingMessages.add("Please go to the Developers Console to delete it manually."
-          + " Otherwise, you may be charged for its usage.");
-    }
-  }
-
-  private void setupBigQueryTable(String projectId, String datasetId, String tableId,
-      TableSchema schema) throws IOException {
-    if (bigQueryClient == null) {
-      bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
-    }
-
-    Datasets datasetService = bigQueryClient.datasets();
-    if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
-      Dataset newDataset = new Dataset().setDatasetReference(
-          new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
-      datasetService.insert(projectId, newDataset).execute();
-    }
-
-    Tables tableService = bigQueryClient.tables();
-    Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
-    if (table == null) {
-      Table newTable = new Table().setSchema(schema).setTableReference(
-          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
-      tableService.insert(projectId, datasetId, newTable).execute();
-    } else if (!table.getSchema().equals(schema)) {
-      throw new RuntimeException(
-          "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
-          + ", actual: " + table.getSchema().toPrettyString());
-    }
-  }
-
-  private void setupPubsubTopic(String topic) throws IOException {
-    if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
-    }
-    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
-      pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
-    }
-  }
-
-  /**
-   * Deletes the Google Cloud Pub/Sub topic.
-   *
-   * @throws IOException if there is a problem deleting the Pub/Sub topic
-   */
-  private void deletePubsubTopic(String topic) throws IOException {
-    if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
-    }
-    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
-      pubsubClient.projects().topics().delete(topic).execute();
-    }
-  }
-
-  /**
-   * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
-   * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
-   * creating the topic if necessary.
-   */
-  public void startInjectorIfNeeded(String inputFile) {
-    ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
-    if (pubsubTopicOptions.isStreaming()
-        && inputFile != null && !inputFile.isEmpty()
-        && pubsubTopicOptions.getPubsubTopic() != null
-        && !pubsubTopicOptions.getPubsubTopic().isEmpty()) {
-      runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
-    }
-  }
-
-  public void setupRunner() {
-    if (options.isStreaming() && options.getRunner().equals(BlockingDataflowRunner.class)) {
-      // In order to cancel the pipelines automatically,
-      // {@literal DataflowRunner} is forced to be used.
-      options.setRunner(DataflowRunner.class);
-    }
-  }
-
-  /**
-   * Runs the batch injector for the streaming pipeline.
-   *
-   * <p>The injector pipeline will read from the given text file, and inject data
-   * into the Google Cloud Pub/Sub topic.
-   */
-  public void runInjectorPipeline(String inputFile, String topic) {
-    DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
-    copiedOptions.setStreaming(false);
-    copiedOptions.setWorkerHarnessContainerImage(
-        DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
-    copiedOptions.setNumWorkers(
-        options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers());
-    copiedOptions.setJobName(options.getJobName() + "-injector");
-    Pipeline injectorPipeline = Pipeline.create(copiedOptions);
-    injectorPipeline.apply(TextIO.Read.from(inputFile))
-                    .apply(IntraBundleParallelization
-                        .of(PubsubFileInjector.publish(topic))
-                        .withMaxParallelism(20));
-    DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run();
-    jobsToCancel.add(injectorJob);
-  }
-
-  /**
-   * Runs the provided injector pipeline for the streaming pipeline.
-   */
-  public void runInjectorPipeline(Pipeline injectorPipeline) {
-    DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run();
-    jobsToCancel.add(injectorJob);
-  }
-
-  /**
-   * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
-   */
-  public void mockUnboundedSource(String inputFile, PipelineResult result) {
-    startInjectorIfNeeded(inputFile);
-    waitToFinish(result);
-  }
-
-  /**
-   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
-   * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
-   */
-  public void waitToFinish(PipelineResult result) {
-    if (result instanceof DataflowPipelineJob) {
-      final DataflowPipelineJob job = (DataflowPipelineJob) result;
-      jobsToCancel.add(job);
-      if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
-        addShutdownHook(jobsToCancel);
-      }
-      try {
-        job.waitUntilFinish();
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
-      }
-    } else {
-      // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
-      // such as EvaluationResults returned by DirectRunner.
-    }
-  }
-
-  private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
-    if (dataflowClient == null) {
-      dataflowClient = options.getDataflowClient();
-    }
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        tearDown();
-        printPendingMessages();
-        for (DataflowPipelineJob job : jobs) {
-          System.out.println("Canceling example pipeline: " + job.getJobId());
-          try {
-            job.cancel();
-          } catch (IOException e) {
-            System.out.println("Failed to cancel the job,"
-                + " please go to the Developers Console to cancel it manually");
-            System.out.println(
-                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
-          }
-        }
-
-        for (DataflowPipelineJob job : jobs) {
-          boolean cancellationVerified = false;
-          for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
-            if (job.getState().isTerminal()) {
-              cancellationVerified = true;
-              System.out.println("Canceled example pipeline: " + job.getJobId());
-              break;
-            } else {
-              System.out.println(
-                  "The example pipeline is still running. Verifying the cancellation.");
-            }
-            try {
-              Thread.sleep(10000);
-            } catch (InterruptedException e) {
-              // Ignore
-            }
-          }
-          if (!cancellationVerified) {
-            System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
-            System.out.println("Please go to the Developers Console to verify manually:");
-            System.out.println(
-                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
-          }
-        }
-      }
-    });
-  }
-
-  private void printPendingMessages() {
-    System.out.println();
-    System.out.println("***********************************************************");
-    System.out.println("***********************************************************");
-    for (String message : pendingMessages) {
-      System.out.println(message);
-    }
-    System.out.println("***********************************************************");
-    System.out.println("***********************************************************");
-  }
-
-  private static <T> T executeNullIfNotFound(
-      AbstractGoogleClientRequest<T> request) throws IOException {
-    try {
-      return request.execute();
-    } catch (GoogleJsonResponseException e) {
-      if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) {
-        return null;
-      } else {
-        throw e;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
index 79fa865..96e8406 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
@@ -18,19 +18,19 @@
 package ${package}.common;
 
 import com.google.api.services.bigquery.model.TableSchema;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * Options that can be used to configure BigQuery tables in Dataflow examples.
+ * Options that can be used to configure BigQuery tables in Beam examples.
  * The project defaults to the project being used to run the example.
  */
-public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
+public interface ExampleBigQueryTableOptions extends GcpOptions {
   @Description("BigQuery dataset name")
-  @Default.String("dataflow_examples")
+  @Default.String("beam_examples")
   String getBigQueryDataset();
   void setBigQueryDataset(String dataset);
 
@@ -49,8 +49,7 @@ public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
   static class BigQueryTableFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      return options.as(DataflowPipelineOptions.class).getJobName()
-          .replace('-', '_');
+      return options.getJobName().replace('-', '_');
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
new file mode 100644
index 0000000..90f935c
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package}.common;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure the Beam examples.
+ */
+public interface ExampleOptions extends PipelineOptions {
+  @Description("Whether to keep jobs running after local process exit")
+  @Default.Boolean(false)
+  boolean getKeepJobsRunning();
+  void setKeepJobsRunning(boolean keepJobsRunning);
+
+  @Description("Number of workers to use when executing the injector pipeline")
+  @Default.Integer(1)
+  int getInjectorNumWorkers();
+  void setInjectorNumWorkers(int numWorkers);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
new file mode 100644
index 0000000..e3fb132
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package}.common;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure Pub/Sub topic/subscription in Beam examples.
+ */
+public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
+  @Description("Pub/Sub subscription")
+  @Default.InstanceFactory(PubsubSubscriptionFactory.class)
+  String getPubsubSubscription();
+  void setPubsubSubscription(String subscription);
+
+  /**
+   * Returns a default Pub/Sub subscription based on the project and the job names.
+   */
+  static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
+    @Override
+    public String create(PipelineOptions options) {
+      return "projects/" + options.as(GcpOptions.class).getProject()
+          + "/subscriptions/" + options.getJobName();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
index 8a7c9cf..1825267 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
@@ -17,36 +17,29 @@
  */
 package ${package}.common;
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * Options that can be used to configure Pub/Sub topic in Dataflow examples.
+ * Options that can be used to configure Pub/Sub topic in Beam examples.
  */
-public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
+public interface ExamplePubsubTopicOptions extends GcpOptions {
   @Description("Pub/Sub topic")
   @Default.InstanceFactory(PubsubTopicFactory.class)
   String getPubsubTopic();
   void setPubsubTopic(String topic);
 
-  @Description("Number of workers to use when executing the injector pipeline")
-  @Default.Integer(1)
-  int getInjectorNumWorkers();
-  void setInjectorNumWorkers(int numWorkers);
-
   /**
    * Returns a default Pub/Sub topic based on the project and the job names.
    */
   static class PubsubTopicFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowPipelineOptions =
-          options.as(DataflowPipelineOptions.class);
-      return "projects/" + dataflowPipelineOptions.getProject()
-          + "/topics/" + dataflowPipelineOptions.getJobName();
+      return "projects/" + options.as(GcpOptions.class).getProject()
+          + "/topics/" + options.getJobName();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
new file mode 100644
index 0000000..afef188
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package}.common;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
+
+/**
+ * The utility class that sets up and tears down external resources,
+ * and cancels the streaming pipelines once the program terminates.
+ *
+ * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ */
+public class ExampleUtils {
+
+  private static final int SC_NOT_FOUND = 404;
+
+  private final PipelineOptions options;
+  private Bigquery bigQueryClient = null;
+  private Pubsub pubsubClient = null;
+  private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet();
+  private List<String> pendingMessages = Lists.newArrayList();
+
+  /**
+   * Do resources and runner options setup.
+   */
+  public ExampleUtils(PipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Sets up external resources that are required by the example,
+   * such as Pub/Sub topics and BigQuery tables.
+   *
+   * @throws IOException if there is a problem setting up the resources
+   */
+  public void setup() throws IOException {
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backOff =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff();
+    Throwable lastException = null;
+    try {
+      do {
+        try {
+          setupPubsub();
+          setupBigQueryTable();
+          return;
+        } catch (GoogleJsonResponseException e) {
+          lastException = e;
+        }
+      } while (BackOffUtils.next(sleeper, backOff));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      // Ignore InterruptedException
+    }
+    throw new RuntimeException(lastException);
+  }
+
+  /**
+   * Sets up the Google Cloud Pub/Sub topic.
+   *
+   * <p>If the topic doesn't exist, a new topic with the given name will be created.
+   *
+   * @throws IOException if there is a problem setting up the Pub/Sub topic
+   */
+  public void setupPubsub() throws IOException {
+    ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+        options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+    if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+      pendingMessages.add("**********************Set Up Pubsub************************");
+      setupPubsubTopic(pubsubOptions.getPubsubTopic());
+      pendingMessages.add("The Pub/Sub topic has been set up for this example: "
+          + pubsubOptions.getPubsubTopic());
+
+      if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+        setupPubsubSubscription(
+            pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
+        pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
+            + pubsubOptions.getPubsubSubscription());
+      }
+    }
+  }
+
+  /**
+   * Sets up the BigQuery table with the given schema.
+   *
+   * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
+   * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
+   * will be created.
+   *
+   * @throws IOException if there is a problem setting up the BigQuery table
+   */
+  public void setupBigQueryTable() throws IOException {
+    ExampleBigQueryTableOptions bigQueryTableOptions =
+        options.as(ExampleBigQueryTableOptions.class);
+    if (bigQueryTableOptions.getBigQueryDataset() != null
+        && bigQueryTableOptions.getBigQueryTable() != null
+        && bigQueryTableOptions.getBigQuerySchema() != null) {
+      pendingMessages.add("******************Set Up Big Query Table*******************");
+      setupBigQueryTable(bigQueryTableOptions.getProject(),
+                         bigQueryTableOptions.getBigQueryDataset(),
+                         bigQueryTableOptions.getBigQueryTable(),
+                         bigQueryTableOptions.getBigQuerySchema());
+      pendingMessages.add("The BigQuery table has been set up for this example: "
+          + bigQueryTableOptions.getProject()
+          + ":" + bigQueryTableOptions.getBigQueryDataset()
+          + "." + bigQueryTableOptions.getBigQueryTable());
+    }
+  }
+
+  /**
+   * Tears down external resources that can be deleted upon the example's completion.
+   */
+  private void tearDown() {
+    pendingMessages.add("*************************Tear Down*************************");
+    ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+        options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+    if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+      try {
+        deletePubsubTopic(pubsubOptions.getPubsubTopic());
+        pendingMessages.add("The Pub/Sub topic has been deleted: "
+            + pubsubOptions.getPubsubTopic());
+      } catch (IOException e) {
+        pendingMessages.add("Failed to delete the Pub/Sub topic : "
+            + pubsubOptions.getPubsubTopic());
+      }
+      if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+        try {
+          deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
+          pendingMessages.add("The Pub/Sub subscription has been deleted: "
+              + pubsubOptions.getPubsubSubscription());
+        } catch (IOException e) {
+          pendingMessages.add("Failed to delete the Pub/Sub subscription : "
+              + pubsubOptions.getPubsubSubscription());
+        }
+      }
+    }
+
+    ExampleBigQueryTableOptions bigQueryTableOptions =
+        options.as(ExampleBigQueryTableOptions.class);
+    if (bigQueryTableOptions.getBigQueryDataset() != null
+        && bigQueryTableOptions.getBigQueryTable() != null
+        && bigQueryTableOptions.getBigQuerySchema() != null) {
+      pendingMessages.add("The BigQuery table might contain the example's output, "
+          + "and it is not deleted automatically: "
+          + bigQueryTableOptions.getProject()
+          + ":" + bigQueryTableOptions.getBigQueryDataset()
+          + "." + bigQueryTableOptions.getBigQueryTable());
+      pendingMessages.add("Please go to the Developers Console to delete it manually."
+          + " Otherwise, you may be charged for its usage.");
+    }
+  }
+
+  private void setupBigQueryTable(String projectId, String datasetId, String tableId,
+      TableSchema schema) throws IOException {
+    if (bigQueryClient == null) {
+      bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+    }
+
+    Datasets datasetService = bigQueryClient.datasets();
+    if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
+      Dataset newDataset = new Dataset().setDatasetReference(
+          new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
+      datasetService.insert(projectId, newDataset).execute();
+    }
+
+    Tables tableService = bigQueryClient.tables();
+    Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
+    if (table == null) {
+      Table newTable = new Table().setSchema(schema).setTableReference(
+          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
+      tableService.insert(projectId, datasetId, newTable).execute();
+    } else if (!table.getSchema().equals(schema)) {
+      throw new RuntimeException(
+          "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
+          + ", actual: " + table.getSchema().toPrettyString());
+    }
+  }
+
+  private void setupPubsubTopic(String topic) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
+      pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
+    }
+  }
+
+  private void setupPubsubSubscription(String topic, String subscription) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
+      Subscription subInfo = new Subscription()
+        .setAckDeadlineSeconds(60)
+        .setTopic(topic);
+      pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
+    }
+  }
+
+  /**
+   * Deletes the Google Cloud Pub/Sub topic.
+   *
+   * @throws IOException if there is a problem deleting the Pub/Sub topic
+   */
+  private void deletePubsubTopic(String topic) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
+      pubsubClient.projects().topics().delete(topic).execute();
+    }
+  }
+
+  /**
+   * Deletes the Google Cloud Pub/Sub subscription.
+   *
+   * @throws IOException if there is a problem deleting the Pub/Sub subscription
+   */
+  private void deletePubsubSubscription(String subscription) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
+      pubsubClient.projects().subscriptions().delete(subscription).execute();
+    }
+  }
+
+  /**
+   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
+   * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+   */
+  public void waitToFinish(PipelineResult result) {
+    pipelinesToCancel.add(result);
+    if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
+      addShutdownHook(pipelinesToCancel);
+    }
+    try {
+      result.waitUntilFinish();
+    } catch (UnsupportedOperationException e) {
+      // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
+      // such as EvaluationResults returned by DirectRunner.
+      tearDown();
+      printPendingMessages();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to wait the pipeline until finish: " + result);
+    }
+  }
+
+  private void addShutdownHook(final Collection<PipelineResult> pipelineResults) {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        tearDown();
+        printPendingMessages();
+        for (PipelineResult pipelineResult : pipelineResults) {
+          try {
+            pipelineResult.cancel();
+          } catch (IOException e) {
+            System.out.println("Failed to cancel the job.");
+            System.out.println(e.getMessage());
+          }
+        }
+
+        for (PipelineResult pipelineResult : pipelineResults) {
+          boolean cancellationVerified = false;
+          for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
+            if (pipelineResult.getState().isTerminal()) {
+              cancellationVerified = true;
+              break;
+            } else {
+              System.out.println(
+                  "The example pipeline is still running. Verifying the cancellation.");
+            }
+            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+          }
+          if (!cancellationVerified) {
+            System.out.println("Failed to verify the cancellation for job: " + pipelineResult);
+          }
+        }
+      }
+    });
+  }
+
+  private void printPendingMessages() {
+    System.out.println();
+    System.out.println("***********************************************************");
+    System.out.println("***********************************************************");
+    for (String message : pendingMessages) {
+      System.out.println(message);
+    }
+    System.out.println("***********************************************************");
+    System.out.println("***********************************************************");
+  }
+
+  private static <T> T executeNullIfNotFound(
+      AbstractGoogleClientRequest<T> request) throws IOException {
+    try {
+      return request.execute();
+    } catch (GoogleJsonResponseException e) {
+      if (e.getStatusCode() == SC_NOT_FOUND) {
+        return null;
+      } else {
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
index 58e0821..6ca20f3 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
@@ -23,15 +23,15 @@ import com.google.api.services.pubsub.model.PubsubMessage;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Arrays;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.IntraBundleParallelization;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.Transport;
 
 /**
@@ -69,7 +69,7 @@ public class PubsubFileInjector {
     }
   }
 
-  /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
+  /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */
   public static class Bound extends OldDoFn<String, Void> {
     private final String outputTopic;
     private final String timestampLabelKey;
@@ -83,7 +83,7 @@ public class PubsubFileInjector {
     @Override
     public void startBundle(Context context) {
       this.pubsub =
-          Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class))
+          Transport.newPubsubClient(context.getPipelineOptions().as(PubsubOptions.class))
               .build();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
index 875d3d7..83d0f37 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
@@ -20,6 +20,7 @@ package ${package};
 import ${package}.WordCount.CountWords;
 import ${package}.WordCount.ExtractWordsFn;
 import ${package}.WordCount.FormatAsTextFn;
+
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
@@ -28,8 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -38,14 +40,13 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-
 /**
  * Tests of WordCount.
  */
 @RunWith(JUnit4.class)
 public class WordCountTest {
 
-  /** Example test that tests a specific DoFn. */
+  /** Example test that tests a specific {@link DoFn}. */
   @Test
   public void testExtractWordsFn() throws Exception {
     DoFnTester<String, String> extractWordsFn =
@@ -77,7 +78,7 @@ public class WordCountTest {
     PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
 
     PCollection<String> output = input.apply(new CountWords())
-      .apply(ParDo.of(new FormatAsTextFn()));
+      .apply(MapElements.via(new FormatAsTextFn()));
 
     PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
     p.run();


Mime
View raw message