beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [14/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner
Date Wed, 27 Apr 2016 01:08:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
new file mode 100644
index 0000000..71c8a78
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.runners.dataflow.util.DataflowPathValidator;
+import org.apache.beam.runners.dataflow.util.DataflowTransport;
+import org.apache.beam.runners.dataflow.util.GcsStager;
+import org.apache.beam.runners.dataflow.util.Stager;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
+
+import com.google.api.services.dataflow.Dataflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Internal. Options used to control execution of the Dataflow SDK for
+ * debugging and testing purposes.
+ */
+@Description("[Internal] Options used to control execution of the Dataflow SDK for "
+    + "debugging and testing purposes.")
+@Hidden
+public interface DataflowPipelineDebugOptions extends PipelineOptions {
+
+  /**
+   * The list of backend experiments to enable.
+   *
+   * <p>Dataflow provides a number of experimental features that can be enabled
+   * with this flag.
+   *
+   * <p>Please sync with the Dataflow team before enabling any experiments.
+   */
+  @Description("[Experimental] Dataflow provides a number of experimental features that can "
+      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
+      + "experiments.")
+  @Experimental
+  List<String> getExperiments();
+  void setExperiments(List<String> value);
+
+  /**
+   * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
+   * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
+   * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
+   */
+  @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
+      + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
+      + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
+  @Default.String(Dataflow.DEFAULT_ROOT_URL)
+  String getApiRootUrl();
+  void setApiRootUrl(String value);
+
+  /**
+   * Dataflow endpoint to use.
+   *
+   * <p>Defaults to the current version of the Google Cloud Dataflow
+   * API, at the time the current SDK version was released.
+   *
+   * <p>If the string contains "://", then this is treated as a URL,
+   * otherwise {@link #getApiRootUrl()} is used as the root
+   * URL.
+   */
+  @Description("The URL for the Dataflow API. If the string contains \"://\", this"
+      + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
+  @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
+  String getDataflowEndpoint();
+  void setDataflowEndpoint(String value);
+
+  /**
+   * The path to write the translated Dataflow job specification out to
+   * at job submission time. The Dataflow job specification will be represented in JSON
+   * format.
+   */
+  @Description("The path to write the translated Dataflow job specification out to "
+      + "at job submission time. The Dataflow job specification will be represented in JSON "
+      + "format.")
+  String getDataflowJobFile();
+  void setDataflowJobFile(String value);
+
+  /**
+   * The class of the validator that should be created and used to validate paths.
+   * If pathValidator has not been set explicitly, an instance of this class will be
+   * constructed and used as the path validator.
+   */
+  @Description("The class of the validator that should be created and used to validate paths. "
+      + "If pathValidator has not been set explicitly, an instance of this class will be "
+      + "constructed and used as the path validator.")
+  @Default.Class(DataflowPathValidator.class)
+  Class<? extends PathValidator> getPathValidatorClass();
+  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+  /**
+   * The path validator instance that should be used to validate paths.
+   * If no path validator has been set explicitly, the default is to use the instance factory that
+   * constructs a path validator based upon the currently set pathValidatorClass.
+   */
+  @JsonIgnore
+  @Description("The path validator instance that should be used to validate paths. "
+      + "If no path validator has been set explicitly, the default is to use the instance factory "
+      + "that constructs a path validator based upon the currently set pathValidatorClass.")
+  @Default.InstanceFactory(PathValidatorFactory.class)
+  PathValidator getPathValidator();
+  void setPathValidator(PathValidator validator);
+
+  /**
+   * The class responsible for staging resources to be accessible by workers
+   * during job execution. If stager has not been set explicitly, an instance of this class
+   * will be created and used as the resource stager.
+   */
+  @Description("The class of the stager that should be created and used to stage resources. "
+      + "If stager has not been set explicitly, an instance of the this class will be created "
+      + "and used as the resource stager.")
+  @Default.Class(GcsStager.class)
+  Class<? extends Stager> getStagerClass();
+  void setStagerClass(Class<? extends Stager> stagerClass);
+
+  /**
+   * The resource stager instance that should be used to stage resources.
+   * If no stager has been set explicitly, the default is to use the instance factory
+   * that constructs a resource stager based upon the currently set stagerClass.
+   */
+  @JsonIgnore
+  @Description("The resource stager instance that should be used to stage resources. "
+      + "If no stager has been set explicitly, the default is to use the instance factory "
+      + "that constructs a resource stager based upon the currently set stagerClass.")
+  @Default.InstanceFactory(StagerFactory.class)
+  Stager getStager();
+  void setStager(Stager stager);
+
+  /**
+   * An instance of the Dataflow client. Defaults to creating a Dataflow client
+   * using the current set of options.
+   */
+  @JsonIgnore
+  @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
+      + "using the current set of options.")
+  @Default.InstanceFactory(DataflowClientFactory.class)
+  Dataflow getDataflowClient();
+  void setDataflowClient(Dataflow value);
+
+  /** Returns the default Dataflow client built from the passed in PipelineOptions. */
+  public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
+    @Override
+    public Dataflow create(PipelineOptions options) {
+        return DataflowTransport.newDataflowClient(
+            options.as(DataflowPipelineOptions.class)).build();
+    }
+  }
+
+  /**
+   * Mapping of old PTranform names to new ones, specified as JSON
+   * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
+   * empty string.
+   */
+  @JsonIgnore
+  @Description(
+      "Mapping of old PTranform names to new ones, specified as JSON "
+      + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
+      + "string.")
+  Map<String, String> getTransformNameMapping();
+  void setTransformNameMapping(Map<String, String> value);
+
+  /**
+   * Custom windmill_main binary to use with the streaming runner.
+   */
+  @Description("Custom windmill_main binary to use with the streaming runner")
+  String getOverrideWindmillBinary();
+  void setOverrideWindmillBinary(String value);
+
+  /**
+   * Number of threads to use on the Dataflow worker harness. If left unspecified,
+   * the Dataflow service will compute an appropriate number of threads to use.
+   */
+  @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
+      + "the Dataflow service will compute an appropriate number of threads to use.")
+  int getNumberOfWorkerHarnessThreads();
+  void setNumberOfWorkerHarnessThreads(int value);
+
+  /**
+   * If {@literal true}, save a heap dump before killing a thread or process which is GC
+   * thrashing or out of memory. The location of the heap file will either be echoed back
+   * to the user, or the user will be given the opportunity to download the heap file.
+   *
+   * <p>
+   * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
+   * the boot disk size before setting this flag to true.
+   */
+  @Description("If {@literal true}, save a heap dump before killing a thread or process "
+      + "which is GC thrashing or out of memory.")
+  boolean getDumpHeapOnOOM();
+  void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
+
+  /**
+   * Creates a {@link PathValidator} object using the class specified in
+   * {@link #getPathValidatorClass()}.
+   */
+  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+      @Override
+      public PathValidator create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+      return InstanceBuilder.ofType(PathValidator.class)
+          .fromClass(debugOptions.getPathValidatorClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+
+  /**
+   * Creates a {@link Stager} object using the class specified in
+   * {@link #getStagerClass()}.
+   */
+  public static class StagerFactory implements DefaultValueFactory<Stager> {
+      @Override
+      public Stager create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+      return InstanceBuilder.ofType(Stager.class)
+          .fromClass(debugOptions.getStagerClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
new file mode 100644
index 0000000..6e6ad96
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Options that can be used to configure the {@link DataflowPipelineRunner}.
+ */
+@Description("Options that configure the Dataflow pipeline.")
+public interface DataflowPipelineOptions
+    extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
+        DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions,
+        CloudDebuggerOptions, DataflowWorkerLoggingOptions, DataflowProfilingOptions,
+        PubsubOptions {
+
+  @Description("Project id. Required when running a Dataflow in the cloud. "
+      + "See https://cloud.google.com/storage/docs/projects for further details.")
+  @Override
+  @Validation.Required
+  @Default.InstanceFactory(DefaultProjectFactory.class)
+  String getProject();
+  @Override
+  void setProject(String value);
+
+  /**
+   * GCS path for staging local files, e.g. gs://bucket/object
+   *
+   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
+   *
+   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
+   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
+   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
+   */
+  @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
+      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
+      + "defaults to using tempLocation.")
+  String getStagingLocation();
+  void setStagingLocation(String value);
+
+  /**
+   * The Dataflow job name is used as an idempotence key within the Dataflow service.
+   * If there is an existing job that is currently active, another active job with the same
+   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
+   */
+  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
+      + "For each running job in the same GCP project, jobs with the same name cannot be created "
+      + "unless the new job is an explicit update of the previous one. Defaults to using "
+      + "ApplicationName-UserName-Date. The job name must match the regular expression "
+      + "'[a-z]([-a-z0-9]{0,38}[a-z0-9])?'. The runner will automatically truncate the name of the "
+      + "job and convert to lower case.")
+  @Default.InstanceFactory(JobNameFactory.class)
+  String getJobName();
+  void setJobName(String value);
+
+  /**
+   * Whether to update the currently running pipeline with the same name as this one.
+   */
+  @Description(
+      "If set, replace the existing pipeline with the name specified by --jobName with "
+          + "this pipeline, preserving state.")
+  boolean isUpdate();
+  void setUpdate(boolean value);
+
+  /**
+   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
+   * local system user name (if available), and the current time. The normalization makes sure that
+   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
+   * characters.
+   *
+   * <p>This job name factory is only able to generate one unique name per second per application
+   * and user combination.
+   */
+  public static class JobNameFactory implements DefaultValueFactory<String> {
+    private static final DateTimeFormatter FORMATTER =
+        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+    @Override
+    public String create(PipelineOptions options) {
+      String appName = options.as(ApplicationNameOptions.class).getAppName();
+      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
+          : appName.toLowerCase()
+                   .replaceAll("[^a-z0-9]", "0")
+                   .replaceAll("^[^a-z]", "a");
+      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+      String normalizedUserName = userName.toLowerCase()
+                                          .replaceAll("[^a-z0-9]", "0");
+      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
new file mode 100644
index 0000000..a29b328
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+
+/**
+ * Options that are used to configure the Dataflow pipeline worker pool.
+ */
+@Description("Options that are used to configure the Dataflow pipeline worker pool.")
+public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
+  /**
+   * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
+   * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
+   * the Dataflow service will determine the number of workers.
+   */
+  @Description("Number of workers to use when executing the Dataflow job. Note that "
+      + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
+      + "size of the worker pool. If left unspecified, the Dataflow service will "
+      + "determine the number of workers.")
+  int getNumWorkers();
+  void setNumWorkers(int value);
+
+  /**
+   * Type of autoscaling algorithm to use.
+   */
+  @Experimental(Experimental.Kind.AUTOSCALING)
+  public enum AutoscalingAlgorithmType {
+    /** Use numWorkers machines. Do not autoscale the worker pool. */
+    NONE("AUTOSCALING_ALGORITHM_NONE"),
+
+    @Deprecated
+    BASIC("AUTOSCALING_ALGORITHM_BASIC"),
+
+    /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
+    THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
+
+    private final String algorithm;
+
+    private AutoscalingAlgorithmType(String algorithm) {
+      this.algorithm = algorithm;
+    }
+
+    /** Returns the string representation of this type. */
+    public String getAlgorithm() {
+      return this.algorithm;
+    }
+  }
+
+  /**
+   * [Experimental] The autoscaling algorithm to use for the workerpool.
+   *
+   * <ul>
+   *   <li>NONE: does not change the size of the worker pool.</li>
+   *   <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
+   *   <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
+   *   </li>
+   * </ul>
+   */
+  @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
+      + "NONE: does not change the size of the worker pool. "
+      + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
+      + "completes. "
+      + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
+  @Experimental(Experimental.Kind.AUTOSCALING)
+  AutoscalingAlgorithmType getAutoscalingAlgorithm();
+  void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
+
+  /**
+   * The maximum number of workers to use for the workerpool. This options limits the size of the
+   * workerpool for the lifetime of the job, including
+   * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
+   * If left unspecified, the Dataflow service will compute a ceiling.
+   */
+  @Description("The maximum number of workers to use for the workerpool. This options limits the "
+      + "size of the workerpool for the lifetime of the job, including pipeline updates. "
+      + "If left unspecified, the Dataflow service will compute a ceiling.")
+  int getMaxNumWorkers();
+  void setMaxNumWorkers(int value);
+
+  /**
+   * Remote worker disk size, in gigabytes, or 0 to use the default size.
+   */
+  @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
+  int getDiskSizeGb();
+  void setDiskSizeGb(int value);
+
+  /**
+   * Docker container image that executes Dataflow worker harness, residing in Google Container
+   * Registry.
+   */
+  @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
+  @Description("Docker container image that executes Dataflow worker harness, residing in Google "
+      + " Container Registry.")
+  @Hidden
+  String getWorkerHarnessContainerImage();
+  void setWorkerHarnessContainerImage(String value);
+
+  /**
+   * Returns the default Docker container image that executes Dataflow worker harness, residing in
+   * Google Container Registry.
+   */
+  public static class WorkerHarnessContainerImageFactory
+      implements DefaultValueFactory<String> {
+    @Override
+    public String create(PipelineOptions options) {
+      DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+      if (dataflowOptions.isStreaming()) {
+        return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
+      } else {
+        return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
+      }
+    }
+  }
+
+  /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service.
+   */
+  @Description("GCE network for launching workers. For more information, see the reference "
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getNetwork();
+  void setNetwork(String value);
+
+  /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service. Expected format is
+   * regions/REGION/subnetworks/SUBNETWORK.
+   *
+   * <p>You may also need to specify network option.
+   */
+  @Description("GCE subnetwork for launching workers. For more information, see the reference "
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getSubnetwork();
+  void setSubnetwork(String value);
+
+  /**
+   * GCE <a href="https://developers.google.com/compute/docs/zones"
+   * >availability zone</a> for launching workers.
+   *
+   * <p>Default is up to the Dataflow service.
+   */
+  @Description("GCE availability zone for launching workers. See "
+      + "https://developers.google.com/compute/docs/zones for a list of valid options. "
+      + "Default is up to the Dataflow service.")
+  String getZone();
+  void setZone(String value);
+
+  /**
+   * Machine type to create Dataflow worker VMs as.
+   *
+   * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
+   * for a list of valid options.
+   *
+   * <p>If unset, the Dataflow service will choose a reasonable default.
+   */
+  @Description("Machine type to create Dataflow worker VMs as. See "
+      + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
+      + "If unset, the Dataflow service will choose a reasonable default.")
+  String getWorkerMachineType();
+  void setWorkerMachineType(String value);
+
+  /**
+   * The policy for tearing down the workers spun up by the service.
+   */
+  public enum TeardownPolicy {
+    /**
+     * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
+     * it fails or succeeds.
+     */
+    TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
+    /**
+     * All VMs created for a Dataflow job are left running when the job finishes, regardless of
+     * whether it fails or succeeds.
+     */
+    TEARDOWN_NEVER("TEARDOWN_NEVER"),
+    /**
+     * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
+     * when it fails. (This is typically used for debugging failing jobs by SSHing into the
+     * workers.)
+     */
+    TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
+
+    private final String teardownPolicy;
+
+    private TeardownPolicy(String teardownPolicy) {
+      this.teardownPolicy = teardownPolicy;
+    }
+
+    public String getTeardownPolicyName() {
+      return this.teardownPolicy;
+    }
+  }
+
+  /**
+   * The teardown policy for the VMs.
+   *
+   * <p>If unset, the Dataflow service will choose a reasonable default.
+   */
+  @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
+      + "choose a reasonable default.")
+  TeardownPolicy getTeardownPolicy();
+  void setTeardownPolicy(TeardownPolicy value);
+
+  /**
+   * List of local files to make available to workers.
+   *
+   * <p>Files are placed on the worker's classpath.
+   *
+   * <p>The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Files to stage on GCS and make available to workers. "
+      + "Files are placed on the worker's classpath. "
+      + "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
+
+  /**
+   * Specifies what type of persistent disk should be used. The value should be a full or partial
+   * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
+   * more information, see the
+   * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
+   * documentation for DiskTypes</a>.
+   */
+  @Description("Specifies what type of persistent disk should be used. The value should be a full "
+      + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
+      + "more information, see the API reference documentation for DiskTypes: "
+      + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
+  String getWorkerDiskType();
+  void setWorkerDiskType(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
new file mode 100644
index 0000000..c7b4c91
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+
+import java.util.HashMap;
+
+/**
+ * Options for controlling profiling of pipeline execution.
+ */
+@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
+@Experimental
+@Hidden
+public interface DataflowProfilingOptions {
+
+  @Description("Whether to periodically dump profiling information to local disk.\n"
+      + "WARNING: Enabling this option may fill local disk with profiling information.")
+  boolean getEnableProfilingAgent();
+  void setEnableProfilingAgent(boolean enabled);
+
+  @Description(
+      "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
+  @Hidden
+  DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
+  void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
+
+  /**
+   * Configuration the for profiling agent.
+   */
+  public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerHarnessOptions.java
new file mode 100644
index 0000000..e946e6b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerHarnessOptions.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+
+/**
+ * Options that are used exclusively within the Dataflow worker harness.
+ * These options have no effect at pipeline creation time.
+ */
+@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
+    + "These options have no effect at pipeline creation time.")
+@Hidden
+public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
+  /**
+   * The identity of the worker running this pipeline.
+   */
+  @Description("The identity of the worker running this pipeline.")
+  String getWorkerId();
+  void setWorkerId(String value);
+
+  /**
+   * The identity of the Dataflow job.
+   */
+  @Description("The identity of the Dataflow job.")
+  String getJobId();
+  void setJobId(String value);
+
+  /**
+   * The size of the worker's in-memory cache, in megabytes.
+   *
+   * <p>Currently, this cache is used for storing read values of side inputs.
+   */
+  @Description("The size of the worker's in-memory cache, in megabytes.")
+  @Default.Integer(100)
+  Integer getWorkerCacheMb();
+  void setWorkerCacheMb(Integer value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
new file mode 100644
index 0000000..74ab59a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Options that are used to control logging configuration on the Dataflow worker.
+ */
+@Description("Options that are used to control logging configuration on the Dataflow worker.")
+public interface DataflowWorkerLoggingOptions extends PipelineOptions {
+  /**
+   * The set of log levels that can be used on the Dataflow worker.
+   */
+  public enum Level {
+    DEBUG, ERROR, INFO, TRACE, WARN
+  }
+
+  /**
+   * This option controls the default log level of all loggers without a log level override.
+   */
+  @Description("Controls the default log level of all loggers without a log level override.")
+  @Default.Enum("INFO")
+  Level getDefaultWorkerLogLevel();
+  void setDefaultWorkerLogLevel(Level level);
+
+  /**
+   * This option controls the log levels for specifically named loggers.
+   *
+   * <p>Later options with equivalent names override earlier options.
+   *
+   * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging
+   * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
+   * the expected format is {"Name":"Level",...}, further details on
+   * {@link WorkerLogLevelOverrides#from}.
+   */
+  @Description("This option controls the log levels for specifically named loggers. "
+      + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses "
+      + "java.util.logging, which supports a logging hierarchy based off of names that are '.' "
+      + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger "
+      + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. "
+      + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the "
+      + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that "
+      + "when multiple overrides are specified, the exact name followed by the closest parent "
+      + "takes precedence.")
+  WorkerLogLevelOverrides getWorkerLogLevelOverrides();
+  void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
+
+  /**
+   * Defines a log level override for a specific class, package, or name.
+   *
+   * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports
+   * a logging hierarchy based off of names that are "." separated. It is a common
+   * pattern to have the logger for a given class share the same name as the class itself.
+   * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
+   * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
+   * we can override the log levels:
+   * <ul>
+   *    <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
+   *    representing {@code a.b.c.Foo}.
+   *    <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
+   *    the {@link Package} representing {@code a.b}.
+   *    <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
+   * </ul>
+   * Note that by specifying multiple overrides, the exact name followed by the closest parent
+   * takes precedence.
+   */
+  public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
+    /**
+     * Overrides the default log level for the passed in class.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
+     * and passing in the {@link Class#getName() class name}.
+     */
+    public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
+      Preconditions.checkNotNull(klass, "Expected class to be not null.");
+      addOverrideForName(klass.getName(), level);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in package.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
+     * and passing in the {@link Package#getName() package name}.
+     */
+    public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
+      Preconditions.checkNotNull(pkg, "Expected package to be not null.");
+      addOverrideForName(pkg.getName(), level);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in name.
+     *
+     * <p>Note that because of the hierarchical nature of logger names, this will
+     * override the log level of all loggers that have the passed in name or
+     * a parent logger that has the passed in name.
+     */
+    public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
+      Preconditions.checkNotNull(name, "Expected name to be not null.");
+      Preconditions.checkNotNull(level,
+          "Expected level to be one of %s.", Arrays.toString(Level.values()));
+      put(name, level);
+      return this;
+    }
+
+    /**
+     * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s.
+     * The {@code Name} generally represents the fully qualified Java
+     * {@link Class#getName() class name}, or fully qualified Java
+     * {@link Package#getName() package name}, or custom logger name. The {@code Level}
+     * represents the log level and must be one of {@link Level}.
+     */
+    @JsonCreator
+    public static WorkerLogLevelOverrides from(Map<String, String> values) {
+      Preconditions.checkNotNull(values, "Expected values to be not null.");
+      WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
+      for (Map.Entry<String, String> entry : values.entrySet()) {
+        try {
+          overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue()));
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(String.format(
+              "Unsupported log level '%s' requested for %s. Must be one of %s.",
+              entry.getValue(), entry.getKey(), Arrays.toString(Level.values())));
+        }
+
+      }
+      return overrides;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
new file mode 100644
index 0000000..e66ffc9
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * A set of options used to configure the {@link TestPipeline}.
+ */
+public interface TestDataflowPipelineOptions extends TestPipelineOptions,
+       BlockingDataflowPipelineOptions {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
new file mode 100644
index 0000000..3ab91f5
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
+ * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
+  private static final String TENTATIVE_COUNTER = "tentative";
+  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
+  private static final Map<String, PipelineResult> EXECUTION_RESULTS =
+      new ConcurrentHashMap<String, PipelineResult>();
+
+  private final TestDataflowPipelineOptions options;
+  private final DataflowPipelineRunner runner;
+  private int expectedNumberOfAssertions = 0;
+
+  TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
+    this.options = options;
+    this.runner = DataflowPipelineRunner.fromOptions(options);
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static TestDataflowPipelineRunner fromOptions(
+      PipelineOptions options) {
+    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+    dataflowOptions.setStagingLocation(Joiner.on("/").join(
+        new String[]{dataflowOptions.getTempRoot(),
+          dataflowOptions.getJobName(), "output", "results"}));
+
+    return new TestDataflowPipelineRunner(dataflowOptions);
+  }
+
+  public static PipelineResult getPipelineResultByJobName(String jobName) {
+    return EXECUTION_RESULTS.get(jobName);
+  }
+
+  @Override
+  public DataflowPipelineJob run(Pipeline pipeline) {
+    return run(pipeline, runner);
+  }
+
+  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
+
+    final DataflowPipelineJob job;
+    try {
+      job = runner.run(pipeline);
+    } catch (DataflowJobExecutionException ex) {
+      throw new IllegalStateException("The dataflow failed.");
+    }
+
+    LOG.info("Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(), expectedNumberOfAssertions);
+
+    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
+        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+
+    try {
+      final Optional<Boolean> result;
+
+      if (options.isStreaming()) {
+        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
+            new Callable<Optional<Boolean>>() {
+          @Override
+          public Optional<Boolean> call() throws Exception {
+            try {
+              for (;;) {
+                Optional<Boolean> result = checkForSuccess(job);
+                if (result.isPresent()) {
+                  return result;
+                }
+                Thread.sleep(10000L);
+              }
+            } finally {
+              LOG.info("Cancelling Dataflow job {}", job.getJobId());
+              job.cancel();
+            }
+          }
+        });
+        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
+        if (finalState == null || finalState == State.RUNNING) {
+          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
+              job.getJobId());
+          job.cancel();
+        }
+        result = resultFuture.get();
+      } else {
+        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
+        result = checkForSuccess(job);
+      }
+      if (!result.isPresent()) {
+        throw new IllegalStateException(
+            "The dataflow did not output a success or failure metric.");
+      } else if (!result.get()) {
+        throw new AssertionError(messageHandler.getErrorMessage() == null ?
+            "The dataflow did not return a failure reason."
+            : messageHandler.getErrorMessage());
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause());
+      throw new RuntimeException(e.getCause());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    EXECUTION_RESULTS.put(options.getJobName(), job);
+    return job;
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (transform instanceof PAssert.OneSideInputAssert
+        || transform instanceof PAssert.TwoSideInputAssert) {
+      expectedNumberOfAssertions += 1;
+    }
+
+    return runner.apply(transform, input);
+  }
+
+  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
+      throws IOException {
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info("The pipeline failed");
+      return Optional.of(false);
+    }
+
+    JobMetrics metrics = job.getDataflowClient().projects().jobs()
+        .getMetrics(job.getProjectId(), job.getJobId()).execute();
+
+    if (metrics == null || metrics.getMetrics() == null) {
+      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+    } else {
+      int successes = 0;
+      int failures = 0;
+      for (MetricUpdate metric : metrics.getMetrics()) {
+        if (metric.getName() == null || metric.getName().getContext() == null
+            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+          // Don't double count using the non-tentative version of the metric.
+          continue;
+        }
+        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+          successes += ((BigDecimal) metric.getScalar()).intValue();
+        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+          failures += ((BigDecimal) metric.getScalar()).intValue();
+        }
+      }
+
+      if (failures > 0) {
+        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+            + "{} expected assertions.", job.getJobId(), successes, failures,
+            expectedNumberOfAssertions);
+        return Optional.of(false);
+      } else if (successes >= expectedNumberOfAssertions) {
+        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+            + "{} expected assertions.", job.getJobId(), successes, failures,
+            expectedNumberOfAssertions);
+        return Optional.of(true);
+      }
+
+      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
+          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
+    }
+
+    return Optional.<Boolean>absent();
+  }
+
+  @Override
+  public String toString() {
+    return "TestDataflowPipelineRunner#" + options.getAppName();
+  }
+
+  /**
+   * Cancels the workflow on the first error message it sees.
+   *
+   * <p>Creates an error message representing the concatenation of all error messages seen.
+   */
+  private static class CancelWorkflowOnError implements JobMessagesHandler {
+    private final DataflowPipelineJob job;
+    private final JobMessagesHandler messageHandler;
+    private final StringBuffer errorMessage;
+    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+      this.errorMessage = new StringBuffer();
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      messageHandler.process(messages);
+      for (JobMessage message : messages) {
+        if (message.getMessageImportance() != null
+            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(), message.getMessageText());
+          errorMessage.append(message.getMessageText());
+        }
+      }
+      if (errorMessage.length() > 0) {
+        LOG.info("Cancelling Dataflow job {}", job.getJobId());
+        try {
+          job.cancel();
+        } catch (Exception ignore) {
+          // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
+          // messages.
+        }
+      }
+    }
+
+    private String getErrorMessage() {
+      return errorMessage.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
new file mode 100644
index 0000000..ddc5d6f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.PathValidator;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class DataflowPathValidator implements PathValidator {
+
+  private DataflowPipelineOptions dataflowOptions;
+
+  DataflowPathValidator(DataflowPipelineOptions options) {
+    this.dataflowOptions = options;
+  }
+
+  public static DataflowPathValidator fromOptions(PipelineOptions options) {
+    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
+  }
+
+  /**
+   * Validates the the input GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateInputFilePatternSupported(String filepattern) {
+    GcsPath gcsPath = getGcsPath(filepattern);
+    Preconditions.checkArgument(
+        dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+    String returnValue = verifyPath(filepattern);
+    verifyPathIsAccessible(filepattern, "Could not find file %s");
+    return returnValue;
+  }
+
+  /**
+   * Validates the the output GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateOutputFilePrefixSupported(String filePrefix) {
+    String returnValue = verifyPath(filePrefix);
+    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+    return returnValue;
+  }
+
+  @Override
+  public String verifyPath(String path) {
+    GcsPath gcsPath = getGcsPath(path);
+    Preconditions.checkArgument(gcsPath.isAbsolute(),
+        "Must provide absolute paths for Dataflow");
+    Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
+        "Dataflow Service does not allow objects with consecutive slashes");
+    return gcsPath.toResourceName();
+  }
+
+  private void verifyPathIsAccessible(String path, String errorMessage) {
+    GcsPath gcsPath = getGcsPath(path);
+    try {
+      Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
+        errorMessage, path);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+          e);
+    }
+  }
+
+  private GcsPath getGcsPath(String path) {
+    try {
+      return GcsPath.fromUri(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "%s expected a valid 'gs://' path but was given '%s'",
+          dataflowOptions.getRunner().getSimpleName(), path), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
new file mode 100644
index 0000000..0199657
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.apache.beam.sdk.util.Transport.getJsonFactory;
+import static org.apache.beam.sdk.util.Transport.getTransport;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class DataflowTransport {
+
+
+  private static class ApiComponents {
+    public String rootUrl;
+    public String servicePath;
+
+    public ApiComponents(String root, String path) {
+      this.rootUrl = root;
+      this.servicePath = path;
+    }
+  }
+
+  private static ApiComponents apiComponentsFromUrl(String urlString) {
+    try {
+      URL url = new URL(urlString);
+      String rootUrl = url.getProtocol() + "://" + url.getHost() +
+          (url.getPort() > 0 ? ":" + url.getPort() : "");
+      return new ApiComponents(rootUrl, url.getPath());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException("Invalid URL: " + urlString);
+    }
+  }
+
+  /**
+   * Returns a Google Cloud Dataflow client builder.
+   */
+  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
+    String servicePath = options.getDataflowEndpoint();
+    ApiComponents components;
+    if (servicePath.contains("://")) {
+      components = apiComponentsFromUrl(servicePath);
+    } else {
+      components = new ApiComponents(options.getApiRootUrl(), servicePath);
+    }
+
+    return new Dataflow.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setRootUrl(components.rootUrl)
+        .setServicePath(components.servicePath)
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
+    return new Clouddebugger.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a Dataflow client that does not automatically retry failed
+   * requests.
+   */
+  public static Dataflow.Builder
+      newRawDataflowClient(DataflowPipelineOptions options) {
+    return newDataflowClient(options)
+        .setHttpRequestInitializer(options.getGcpCredential())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credential credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return httpRequestInitializer;
+    } else {
+      return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
new file mode 100644
index 0000000..8e7cbbe
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Utility class for staging files to GCS.
+ */
+public class GcsStager implements Stager {
+  private DataflowPipelineOptions options;
+
+  private GcsStager(DataflowPipelineOptions options) {
+    this.options = options;
+  }
+
+  public static GcsStager fromOptions(PipelineOptions options) {
+    return new GcsStager(options.as(DataflowPipelineOptions.class));
+  }
+
+  @Override
+  public List<DataflowPackage> stageFiles() {
+    Preconditions.checkNotNull(options.getStagingLocation());
+    List<String> filesToStage = options.getFilesToStage();
+    String windmillBinary =
+        options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
+    if (windmillBinary != null) {
+      filesToStage.add("windmill_main=" + windmillBinary);
+    }
+    return PackageUtil.stageClasspathElements(
+        options.getFilesToStage(), options.getStagingLocation());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
new file mode 100644
index 0000000..2eec9cc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.util.TimeUtil;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for monitoring jobs submitted to the service.
+ */
+public final class MonitoringUtil {
+
+  private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
+  private static final String ENDPOINT_OVERRIDE_ENV_VAR =
+      "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
+
+  private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
+      ImmutableMap
+          .<String, State>builder()
+          .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
+          .put("JOB_STATE_STOPPED", State.STOPPED)
+          .put("JOB_STATE_RUNNING", State.RUNNING)
+          .put("JOB_STATE_DONE", State.DONE)
+          .put("JOB_STATE_FAILED", State.FAILED)
+          .put("JOB_STATE_CANCELLED", State.CANCELLED)
+          .put("JOB_STATE_UPDATED", State.UPDATED)
+          .build();
+
+  private String projectId;
+  private Messages messagesClient;
+
+  /**
+   * An interface that can be used for defining callbacks to receive a list
+   * of JobMessages containing monitoring information.
+   */
+  public interface JobMessagesHandler {
+    /** Process the rows. */
+    void process(List<JobMessage> messages);
+  }
+
+  /** A handler that prints monitoring messages to a stream. */
+  public static class PrintHandler implements JobMessagesHandler {
+    private PrintStream out;
+
+    /**
+     * Construct the handler.
+     *
+     * @param stream The stream to write the messages to.
+     */
+    public PrintHandler(PrintStream stream) {
+      out = stream;
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      for (JobMessage message : messages) {
+        if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
+          continue;
+        }
+        String importanceString = null;
+        if (message.getMessageImportance() == null) {
+          continue;
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          importanceString = "Error:   ";
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
+          importanceString = "Warning: ";
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
+          importanceString = "Basic:  ";
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
+          importanceString = "Detail:  ";
+        } else {
+          // TODO: Remove filtering here once getJobMessages supports minimum
+          // importance.
+          continue;
+        }
+        @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
+        if (time == null) {
+          out.print("UNKNOWN TIMESTAMP: ");
+        } else {
+          out.print(time + ": ");
+        }
+        if (importanceString != null) {
+          out.print(importanceString);
+        }
+        out.println(message.getMessageText());
+      }
+      out.flush();
+    }
+  }
+
+  /** Construct a helper for monitoring. */
+  public MonitoringUtil(String projectId, Dataflow dataflow) {
+    this(projectId, dataflow.projects().jobs().messages());
+  }
+
+  // @VisibleForTesting
+  MonitoringUtil(String projectId, Messages messagesClient) {
+    this.projectId = projectId;
+    this.messagesClient = messagesClient;
+  }
+
+  /**
+   * Comparator for sorting rows in increasing order based on timestamp.
+   */
+  public static class TimeStampComparator implements Comparator<JobMessage> {
+    @Override
+    public int compare(JobMessage o1, JobMessage o2) {
+      @Nullable Instant t1 = fromCloudTime(o1.getTime());
+      if (t1 == null) {
+        return -1;
+      }
+      @Nullable Instant t2 = fromCloudTime(o2.getTime());
+      if (t2 == null) {
+        return 1;
+      }
+      return t1.compareTo(t2);
+    }
+  }
+
+  /**
+   * Return job messages sorted in ascending order by timestamp.
+   * @param jobId The id of the job to get the messages for.
+   * @param startTimestampMs Return only those messages with a
+   *   timestamp greater than this value.
+   * @return collection of messages
+   * @throws IOException
+   */
+  public ArrayList<JobMessage> getJobMessages(
+      String jobId, long startTimestampMs) throws IOException {
+    // TODO: Allow filtering messages by importance
+    Instant startTimestamp = new Instant(startTimestampMs);
+    ArrayList<JobMessage> allMessages = new ArrayList<>();
+    String pageToken = null;
+    while (true) {
+      Messages.List listRequest = messagesClient.list(projectId, jobId);
+      if (pageToken != null) {
+        listRequest.setPageToken(pageToken);
+      }
+      ListJobMessagesResponse response = listRequest.execute();
+
+      if (response == null || response.getJobMessages() == null) {
+        return allMessages;
+      }
+
+      for (JobMessage m : response.getJobMessages()) {
+        @Nullable Instant timestamp = fromCloudTime(m.getTime());
+        if (timestamp == null) {
+          continue;
+        }
+        if (timestamp.isAfter(startTimestamp)) {
+          allMessages.add(m);
+        }
+      }
+
+      if (response.getNextPageToken() == null) {
+        break;
+      } else {
+        pageToken = response.getNextPageToken();
+      }
+    }
+
+    Collections.sort(allMessages, new TimeStampComparator());
+    return allMessages;
+  }
+
+  public static String getJobMonitoringPageURL(String projectName, String jobId) {
+    try {
+      // Project name is allowed in place of the project id: the user will be redirected to a URL
+      // that has the project name replaced with project id.
+      return String.format(
+          "https://console.developers.google.com/project/%s/dataflow/job/%s",
+          URLEncoder.encode(projectName, "UTF-8"),
+          URLEncoder.encode(jobId, "UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      // Should never happen.
+      throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
+    }
+  }
+
+  public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
+
+    // If using a different Dataflow API than default, prefix command with an API override.
+    String dataflowApiOverridePrefix = "";
+    String apiUrl = options.getDataflowClient().getBaseUrl();
+    if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
+      dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
+    }
+
+    // Assemble cancel command from optional prefix and project/job parameters.
+    return String.format("%s%s jobs --project=%s cancel %s",
+        dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
+  }
+
+  public static State toState(String stateName) {
+    return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
+        State.UNKNOWN);
+  }
+}


Mime
View raw message