beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [30/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:48:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
new file mode 100644
index 0000000..e961066
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.testing;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
+ * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
+  private static final String TENTATIVE_COUNTER = "tentative";
+  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
+
+  private final TestDataflowPipelineOptions options;
+  private final DataflowPipelineRunner runner;
+  private int expectedNumberOfAssertions = 0;
+
+  TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
+    this.options = options;
+    this.runner = DataflowPipelineRunner.fromOptions(options);
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static TestDataflowPipelineRunner fromOptions(
+      PipelineOptions options) {
+    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+
+    return new TestDataflowPipelineRunner(dataflowOptions);
+  }
+
+  @Override
+  public DataflowPipelineJob run(Pipeline pipeline) {
+    return run(pipeline, runner);
+  }
+
+  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
+
+    final DataflowPipelineJob job;
+    try {
+      job = runner.run(pipeline);
+    } catch (DataflowJobExecutionException ex) {
+      throw new IllegalStateException("The dataflow failed.");
+    }
+
+    LOG.info("Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(), expectedNumberOfAssertions);
+
+    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
+        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+
+    try {
+      final Optional<Boolean> result;
+
+      if (options.isStreaming()) {
+        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
+            new Callable<Optional<Boolean>>() {
+          @Override
+          public Optional<Boolean> call() throws Exception {
+            try {
+              for (;;) {
+                Optional<Boolean> result = checkForSuccess(job);
+                if (result.isPresent()) {
+                  return result;
+                }
+                Thread.sleep(10000L);
+              }
+            } finally {
+              LOG.info("Cancelling Dataflow job {}", job.getJobId());
+              job.cancel();
+            }
+          }
+        });
+        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
+        if (finalState == null || finalState == State.RUNNING) {
+          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
+              job.getJobId());
+          job.cancel();
+        }
+        result = resultFuture.get();
+      } else {
+        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
+        result = checkForSuccess(job);
+      }
+      if (!result.isPresent()) {
+        throw new IllegalStateException(
+            "The dataflow did not output a success or failure metric.");
+      } else if (!result.get()) {
+        throw new AssertionError(messageHandler.getErrorMessage() == null ?
+            "The dataflow did not return a failure reason."
+            : messageHandler.getErrorMessage());
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause());
+      throw new RuntimeException(e.getCause());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return job;
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (transform instanceof PAssert.OneSideInputAssert
+        || transform instanceof PAssert.TwoSideInputAssert) {
+      expectedNumberOfAssertions += 1;
+    }
+
+    return runner.apply(transform, input);
+  }
+
+  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
+      throws IOException {
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info("The pipeline failed");
+      return Optional.of(false);
+    }
+
+    JobMetrics metrics = job.getDataflowClient().projects().jobs()
+        .getMetrics(job.getProjectId(), job.getJobId()).execute();
+
+    if (metrics == null || metrics.getMetrics() == null) {
+      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+    } else {
+      int successes = 0;
+      int failures = 0;
+      for (MetricUpdate metric : metrics.getMetrics()) {
+        if (metric.getName() == null || metric.getName().getContext() == null
+            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+          // Don't double count using the non-tentative version of the metric.
+          continue;
+        }
+        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+          successes += ((BigDecimal) metric.getScalar()).intValue();
+        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+          failures += ((BigDecimal) metric.getScalar()).intValue();
+        }
+      }
+
+      if (failures > 0) {
+        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+            + "{} expected assertions.", job.getJobId(), successes, failures,
+            expectedNumberOfAssertions);
+        return Optional.of(false);
+      } else if (successes >= expectedNumberOfAssertions) {
+        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+            + "{} expected assertions.", job.getJobId(), successes, failures,
+            expectedNumberOfAssertions);
+        return Optional.of(true);
+      }
+
+      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
+          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
+    }
+
+    return Optional.<Boolean>absent();
+  }
+
+  @Override
+  public String toString() {
+    return "TestDataflowPipelineRunner#" + options.getAppName();
+  }
+
+  /**
+   * Cancels the workflow on the first error message it sees.
+   *
+   * <p>Creates an error message representing the concatenation of all error messages seen.
+   */
+  private static class CancelWorkflowOnError implements JobMessagesHandler {
+    private final DataflowPipelineJob job;
+    private final JobMessagesHandler messageHandler;
+    private final StringBuffer errorMessage;
+    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+      this.errorMessage = new StringBuffer();
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      messageHandler.process(messages);
+      for (JobMessage message : messages) {
+        if (message.getMessageImportance() != null
+            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(), message.getMessageText());
+          errorMessage.append(message.getMessageText());
+        }
+      }
+      if (errorMessage.length() > 0) {
+        LOG.info("Cancelling Dataflow job {}", job.getJobId());
+        try {
+          job.cancel();
+        } catch (Exception ignore) {
+          // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
+          // messages.
+        }
+      }
+    }
+
+    private String getErrorMessage() {
+      return errorMessage.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java
new file mode 100644
index 0000000..aeb864a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class DataflowPathValidator implements PathValidator {
+
+  private DataflowPipelineOptions dataflowOptions;
+
+  DataflowPathValidator(DataflowPipelineOptions options) {
+    this.dataflowOptions = options;
+  }
+
+  public static DataflowPathValidator fromOptions(PipelineOptions options) {
+    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
+  }
+
+  /**
+   * Validates the the input GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateInputFilePatternSupported(String filepattern) {
+    GcsPath gcsPath = getGcsPath(filepattern);
+    Preconditions.checkArgument(
+        dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+    String returnValue = verifyPath(filepattern);
+    verifyPathIsAccessible(filepattern, "Could not find file %s");
+    return returnValue;
+  }
+
+  /**
+   * Validates the the output GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateOutputFilePrefixSupported(String filePrefix) {
+    String returnValue = verifyPath(filePrefix);
+    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+    return returnValue;
+  }
+
+  @Override
+  public String verifyPath(String path) {
+    GcsPath gcsPath = getGcsPath(path);
+    Preconditions.checkArgument(gcsPath.isAbsolute(),
+        "Must provide absolute paths for Dataflow");
+    Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
+        "Dataflow Service does not allow objects with consecutive slashes");
+    return gcsPath.toResourceName();
+  }
+
+  private void verifyPathIsAccessible(String path, String errorMessage) {
+    GcsPath gcsPath = getGcsPath(path);
+    try {
+      Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
+        errorMessage, path);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+          e);
+    }
+  }
+
+  private GcsPath getGcsPath(String path) {
+    try {
+      return GcsPath.fromUri(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "%s expected a valid 'gs://' path but was given '%s'",
+          dataflowOptions.getRunner().getSimpleName(), path), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java
new file mode 100644
index 0000000..18e6654
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory;
+import static com.google.cloud.dataflow.sdk.util.Transport.getTransport;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class DataflowTransport {
+
+
+  private static class ApiComponents {
+    public String rootUrl;
+    public String servicePath;
+
+    public ApiComponents(String root, String path) {
+      this.rootUrl = root;
+      this.servicePath = path;
+    }
+  }
+
+  private static ApiComponents apiComponentsFromUrl(String urlString) {
+    try {
+      URL url = new URL(urlString);
+      String rootUrl = url.getProtocol() + "://" + url.getHost() +
+          (url.getPort() > 0 ? ":" + url.getPort() : "");
+      return new ApiComponents(rootUrl, url.getPath());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException("Invalid URL: " + urlString);
+    }
+  }
+
+  /**
+   * Returns a Google Cloud Dataflow client builder.
+   */
+  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
+    String servicePath = options.getDataflowEndpoint();
+    ApiComponents components;
+    if (servicePath.contains("://")) {
+      components = apiComponentsFromUrl(servicePath);
+    } else {
+      components = new ApiComponents(options.getApiRootUrl(), servicePath);
+    }
+
+    return new Dataflow.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setRootUrl(components.rootUrl)
+        .setServicePath(components.servicePath)
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
+    return new Clouddebugger.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a Dataflow client that does not automatically retry failed
+   * requests.
+   */
+  public static Dataflow.Builder
+      newRawDataflowClient(DataflowPipelineOptions options) {
+    return newDataflowClient(options)
+        .setHttpRequestInitializer(options.getGcpCredential())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credential credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return httpRequestInitializer;
+    } else {
+      return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java
new file mode 100644
index 0000000..7307e83
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Utility class for staging files to GCS.
+ */
+public class GcsStager implements Stager {
+  private DataflowPipelineOptions options;
+
+  private GcsStager(DataflowPipelineOptions options) {
+    this.options = options;
+  }
+
+  public static GcsStager fromOptions(PipelineOptions options) {
+    return new GcsStager(options.as(DataflowPipelineOptions.class));
+  }
+
+  @Override
+  public List<DataflowPackage> stageFiles() {
+    Preconditions.checkNotNull(options.getStagingLocation());
+    List<String> filesToStage = options.getFilesToStage();
+    String windmillBinary =
+        options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
+    if (windmillBinary != null) {
+      filesToStage.add("windmill_main=" + windmillBinary);
+    }
+    return PackageUtil.stageClasspathElements(
+        options.getFilesToStage(), options.getStagingLocation());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java
new file mode 100644
index 0000000..2c06a92
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for monitoring jobs submitted to the service.
+ */
+public final class MonitoringUtil {
+
+  private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
+  private static final String ENDPOINT_OVERRIDE_ENV_VAR =
+      "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
+
+  private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
+      ImmutableMap
+          .<String, State>builder()
+          .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
+          .put("JOB_STATE_STOPPED", State.STOPPED)
+          .put("JOB_STATE_RUNNING", State.RUNNING)
+          .put("JOB_STATE_DONE", State.DONE)
+          .put("JOB_STATE_FAILED", State.FAILED)
+          .put("JOB_STATE_CANCELLED", State.CANCELLED)
+          .put("JOB_STATE_UPDATED", State.UPDATED)
+          .build();
+
+  private String projectId;
+  private Messages messagesClient;
+
+  /**
+   * An interface that can be used for defining callbacks to receive a list
+   * of JobMessages containing monitoring information.
+   */
+  public interface JobMessagesHandler {
+    /** Process the rows. */
+    void process(List<JobMessage> messages);
+  }
+
+  /** A handler that prints monitoring messages to a stream. */
+  public static class PrintHandler implements JobMessagesHandler {
+    private PrintStream out;
+
+    /**
+     * Construct the handler.
+     *
+     * @param stream The stream to write the messages to.
+     */
+    public PrintHandler(PrintStream stream) {
+      out = stream;
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      for (JobMessage message : messages) {
+        if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
+          continue;
+        }
+        String importanceString = null;
+        if (message.getMessageImportance() == null) {
+          continue;
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          importanceString = "Error:   ";
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
+          importanceString = "Warning: ";
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
+          importanceString = "Basic:  ";
+        } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
+          importanceString = "Detail:  ";
+        } else {
+          // TODO: Remove filtering here once getJobMessages supports minimum
+          // importance.
+          continue;
+        }
+        @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
+        if (time == null) {
+          out.print("UNKNOWN TIMESTAMP: ");
+        } else {
+          out.print(time + ": ");
+        }
+        if (importanceString != null) {
+          out.print(importanceString);
+        }
+        out.println(message.getMessageText());
+      }
+      out.flush();
+    }
+  }
+
+  /** Construct a helper for monitoring. */
+  public MonitoringUtil(String projectId, Dataflow dataflow) {
+    this(projectId, dataflow.projects().jobs().messages());
+  }
+
+  // @VisibleForTesting
+  MonitoringUtil(String projectId, Messages messagesClient) {
+    this.projectId = projectId;
+    this.messagesClient = messagesClient;
+  }
+
+  /**
+   * Comparator for sorting rows in increasing order based on timestamp.
+   */
+  public static class TimeStampComparator implements Comparator<JobMessage> {
+    @Override
+    public int compare(JobMessage o1, JobMessage o2) {
+      @Nullable Instant t1 = fromCloudTime(o1.getTime());
+      if (t1 == null) {
+        return -1;
+      }
+      @Nullable Instant t2 = fromCloudTime(o2.getTime());
+      if (t2 == null) {
+        return 1;
+      }
+      return t1.compareTo(t2);
+    }
+  }
+
+  /**
+   * Return job messages sorted in ascending order by timestamp.
+   * @param jobId The id of the job to get the messages for.
+   * @param startTimestampMs Return only those messages with a
+   *   timestamp greater than this value.
+   * @return collection of messages
+   * @throws IOException
+   */
+  public ArrayList<JobMessage> getJobMessages(
+      String jobId, long startTimestampMs) throws IOException {
+    // TODO: Allow filtering messages by importance
+    Instant startTimestamp = new Instant(startTimestampMs);
+    ArrayList<JobMessage> allMessages = new ArrayList<>();
+    String pageToken = null;
+    while (true) {
+      Messages.List listRequest = messagesClient.list(projectId, jobId);
+      if (pageToken != null) {
+        listRequest.setPageToken(pageToken);
+      }
+      ListJobMessagesResponse response = listRequest.execute();
+
+      if (response == null || response.getJobMessages() == null) {
+        return allMessages;
+      }
+
+      for (JobMessage m : response.getJobMessages()) {
+        @Nullable Instant timestamp = fromCloudTime(m.getTime());
+        if (timestamp == null) {
+          continue;
+        }
+        if (timestamp.isAfter(startTimestamp)) {
+          allMessages.add(m);
+        }
+      }
+
+      if (response.getNextPageToken() == null) {
+        break;
+      } else {
+        pageToken = response.getNextPageToken();
+      }
+    }
+
+    Collections.sort(allMessages, new TimeStampComparator());
+    return allMessages;
+  }
+
+  public static String getJobMonitoringPageURL(String projectName, String jobId) {
+    try {
+      // Project name is allowed in place of the project id: the user will be redirected to a URL
+      // that has the project name replaced with project id.
+      return String.format(
+          "https://console.developers.google.com/project/%s/dataflow/job/%s",
+          URLEncoder.encode(projectName, "UTF-8"),
+          URLEncoder.encode(jobId, "UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      // Should never happen.
+      throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
+    }
+  }
+
+  public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
+
+    // If using a different Dataflow API than default, prefix command with an API override.
+    String dataflowApiOverridePrefix = "";
+    String apiUrl = options.getDataflowClient().getBaseUrl();
+    if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
+      dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
+    }
+
+    // Assemble cancel command from optional prefix and project/job parameters.
+    return String.format("%s%s jobs --project=%s cancel %s",
+        dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
+  }
+
+  public static State toState(String stateName) {
+    return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
+        State.UNKNOWN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java
new file mode 100644
index 0000000..0e234a8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.hash.Funnels;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.io.Files;
+
+import com.fasterxml.jackson.core.Base64Variants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+/** Helper routines for packages. */
+public class PackageUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
+  /**
+   * A reasonable upper bound on the number of jars required to launch a Dataflow job.
+   */
+  public static final int SANE_CLASSPATH_SIZE = 1000;
+  /**
+   * The initial interval to use between package staging attempts.
+   */
+  private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
+  /**
+   * The maximum number of attempts when staging a file.
+   */
+  private static final int MAX_ATTEMPTS = 5;
+
+  /**
+   * Translates exceptions from API calls.
+   */
+  private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
+
+  /**
+   * Creates a DataflowPackage containing information about how a classpath element should be
+   * staged, including the staging destination as well as its size and hash.
+   *
+   * @param classpathElement The local path for the classpath element.
+   * @param stagingPath The base location for staged classpath elements.
+   * @param overridePackageName If non-null, use the given value as the package name
+   *                            instead of generating one automatically.
+   * @return The package.
+   */
+  @Deprecated
+  public static DataflowPackage createPackage(File classpathElement,
+      String stagingPath, String overridePackageName) {
+    return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
+        .getDataflowPackage();
+  }
+
+  /**
+   * Compute and cache the attributes of a classpath element that we will need to stage it.
+   *
+   * @param classpathElement the file or directory to be staged.
+   * @param stagingPath The base location for staged classpath elements.
+   * @param overridePackageName If non-null, use the given value as the package name
+   *                            instead of generating one automatically.
+   * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
+   */
+  static PackageAttributes createPackageAttributes(File classpathElement,
+      String stagingPath, String overridePackageName) {
+    try {
+      boolean directory = classpathElement.isDirectory();
+
+      // Compute size and hash in one pass over file or directory.
+      Hasher hasher = Hashing.md5().newHasher();
+      OutputStream hashStream = Funnels.asOutputStream(hasher);
+      CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
+
+      if (!directory) {
+        // Files are staged as-is.
+        Files.asByteSource(classpathElement).copyTo(countingOutputStream);
+      } else {
+        // Directories are recursively zipped.
+        ZipFiles.zipDirectory(classpathElement, countingOutputStream);
+      }
+
+      long size = countingOutputStream.getCount();
+      String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
+
+      // Create the DataflowPackage with staging name and location.
+      String uniqueName = getUniqueContentName(classpathElement, hash);
+      String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
+      DataflowPackage target = new DataflowPackage();
+      target.setName(overridePackageName != null ? overridePackageName : uniqueName);
+      target.setLocation(resourcePath);
+
+      return new PackageAttributes(size, hash, directory, target);
+    } catch (IOException e) {
+      throw new RuntimeException("Package setup failure for " + classpathElement, e);
+    }
+  }
+
+  /**
+   * Transfers the classpath elements to the staging location.
+   *
+   * @param classpathElements The elements to stage.
+   * @param stagingPath The base location to stage the elements to.
+   * @return A list of cloud workflow packages, each representing a classpath element.
+   */
+  public static List<DataflowPackage> stageClasspathElements(
+      Collection<String> classpathElements, String stagingPath) {
+    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
+  }
+
+  // Visible for testing.
+  static List<DataflowPackage> stageClasspathElements(
+      Collection<String> classpathElements, String stagingPath,
+      Sleeper retrySleeper) {
+    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
+        + "prepare for execution.", classpathElements.size());
+
+    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
+      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
+          + "copies to all workers. Having this many entries on your classpath may be indicative "
+          + "of an issue in your pipeline. You may want to consider trimming the classpath to "
+          + "necessary dependencies only, using --filesToStage pipeline option to override "
+          + "what files are being staged, or bundling several dependencies into one.",
+          classpathElements.size());
+    }
+
+    ArrayList<DataflowPackage> packages = new ArrayList<>();
+
+    if (stagingPath == null) {
+      throw new IllegalArgumentException(
+          "Can't stage classpath elements on because no staging location has been provided");
+    }
+
+    int numUploaded = 0;
+    int numCached = 0;
+    for (String classpathElement : classpathElements) {
+      String packageName = null;
+      if (classpathElement.contains("=")) {
+        String[] components = classpathElement.split("=", 2);
+        packageName = components[0];
+        classpathElement = components[1];
+      }
+
+      File file = new File(classpathElement);
+      if (!file.exists()) {
+        LOG.warn("Skipping non-existent classpath element {} that was specified.",
+            classpathElement);
+        continue;
+      }
+
+      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
+
+      DataflowPackage workflowPackage = attributes.getDataflowPackage();
+      packages.add(workflowPackage);
+      String target = workflowPackage.getLocation();
+
+      // TODO: Should we attempt to detect the Mime type rather than
+      // always using MimeTypes.BINARY?
+      try {
+        try {
+          long remoteLength = IOChannelUtils.getSizeBytes(target);
+          if (remoteLength == attributes.getSize()) {
+            LOG.debug("Skipping classpath element already staged: {} at {}",
+                classpathElement, target);
+            numCached++;
+            continue;
+          }
+        } catch (FileNotFoundException expected) {
+          // If the file doesn't exist, it means we need to upload it.
+        }
+
+        // Upload file, retrying on failure.
+        AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
+            MAX_ATTEMPTS,
+            INITIAL_BACKOFF_INTERVAL_MS);
+        while (true) {
+          try {
+            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
+            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
+              copyContent(classpathElement, writer);
+            }
+            numUploaded++;
+            break;
+          } catch (IOException e) {
+            if (ERROR_EXTRACTOR.accessDenied(e)) {
+              String errorMessage = String.format(
+                  "Uploaded failed due to permissions error, will NOT retry staging "
+                  + "of classpath %s. Please verify credentials are valid and that you have "
+                  + "write access to %s. Stale credentials can be resolved by executing "
+                  + "'gcloud auth login'.", classpathElement, target);
+              LOG.error(errorMessage);
+              throw new IOException(errorMessage, e);
+            } else if (!backoff.atMaxAttempts()) {
+              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
+                  classpathElement, e);
+              BackOffUtils.next(retrySleeper, backoff);
+            } else {
+              // Rethrow last error, to be included as a cause in the catch below.
+              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
+                  classpathElement, e);
+              throw e;
+            }
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
+      }
+    }
+
+    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
+        + "{} files cached",
+        numUploaded, numCached);
+
+    return packages;
+  }
+
+  /**
+   * Returns a unique name for a file with a given content hash.
+   *
+   * <p>Directory paths are removed. Example:
+   * <pre>
+   * dir="a/b/c/d", contentHash="f000" => d-f000.jar
+   * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
+   * file="a/b/c/d", contentHash="f000" => d-f000
+   * </pre>
+   */
+  static String getUniqueContentName(File classpathElement, String contentHash) {
+    String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
+    String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
+    if (classpathElement.isDirectory()) {
+      return fileName + "-" + contentHash + ".jar";
+    } else if (fileExtension.isEmpty()) {
+      return fileName + "-" + contentHash;
+    }
+    return fileName + "-" + contentHash + "." + fileExtension;
+  }
+
+  /**
+   * Copies the contents of the classpathElement to the output channel.
+   *
+   * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
+   * otherwise the file contents are copied as-is.
+   *
+   * <p>The output channel is not closed.
+   */
+  private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
+      throws IOException {
+    final File classpathElementFile = new File(classpathElement);
+    if (classpathElementFile.isDirectory()) {
+      ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
+    } else {
+      Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
+    }
+  }
+  /**
+   * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
+   */
+  static class PackageAttributes {
+    private final boolean directory;
+    private final long size;
+    private final String hash;
+    private DataflowPackage dataflowPackage;
+
+    public PackageAttributes(long size, String hash, boolean directory,
+        DataflowPackage dataflowPackage) {
+      this.size = size;
+      this.hash = Objects.requireNonNull(hash, "hash");
+      this.directory = directory;
+      this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
+    }
+
+    /**
+     * @return the dataflowPackage
+     */
+    public DataflowPackage getDataflowPackage() {
+      return dataflowPackage;
+    }
+
+    /**
+     * @return the directory
+     */
+    public boolean isDirectory() {
+      return directory;
+    }
+
+    /**
+     * @return the size
+     */
+    public long getSize() {
+      return size;
+    }
+
+    /**
+     * @return the hash
+     */
+    public String getHash() {
+      return hash;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java
new file mode 100644
index 0000000..f6c6a71
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.api.services.dataflow.model.DataflowPackage;
+
+import java.util.List;
+
+/**
+ * Interface for staging files needed for running a Dataflow pipeline.
+ */
+public interface Stager {
+  /* Stage files and return a list of packages. */
+  public List<DataflowPackage> stageFiles();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java
deleted file mode 100644
index 1bd8a85..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.util.GcsUtil;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-
-/**
- * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowTextIOTest {
-
-  private TestDataflowPipelineOptions buildTestPipelineOptions() {
-    TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setGcpCredential(new TestCredential());
-    return options;
-  }
-
-  private GcsUtil buildMockGcsUtil() throws IOException {
-    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
-    // Any request to open gets a new bogus channel
-    Mockito
-        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
-        .then(new Answer<SeekableByteChannel>() {
-          @Override
-          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
-            return FileChannel.open(
-                Files.createTempFile("channel-", ".tmp"),
-                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-          }
-        });
-
-    // Any request for expansion returns a list containing the original GcsPath
-    // This is required to pass validation that occurs in TextIO during apply()
-    Mockito
-        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
-        .then(new Answer<List<GcsPath>>() {
-          @Override
-          public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
-            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
-          }
-        });
-
-    return mockGcsUtil;
-  }
-
-  /**
-   * This tests a few corner cases that should not crash.
-   */
-  @Test
-  public void testGoodWildcards() throws Exception {
-    TestDataflowPipelineOptions options = buildTestPipelineOptions();
-    options.setGcsUtil(buildMockGcsUtil());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    applyRead(pipeline, "gs://bucket/foo");
-    applyRead(pipeline, "gs://bucket/foo/");
-    applyRead(pipeline, "gs://bucket/foo/*");
-    applyRead(pipeline, "gs://bucket/foo/?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]");
-    applyRead(pipeline, "gs://bucket/foo/*baz*");
-    applyRead(pipeline, "gs://bucket/foo/*baz?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
-    applyRead(pipeline, "gs://bucket/foo/baz/*");
-    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
-    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
-    applyRead(pipeline, "gs://bucket/foo*/baz");
-    applyRead(pipeline, "gs://bucket/foo?/baz");
-    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
-    // Check that running doesn't fail.
-    pipeline.run();
-  }
-
-  private void applyRead(Pipeline pipeline, String path) {
-    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java
deleted file mode 100644
index 1b5a3c7..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineDebugOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineDebugOptionsTest {
-  @Test
-  public void testTransformNameMapping() throws Exception {
-    DataflowPipelineDebugOptions options = PipelineOptionsFactory
-        .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"})
-        .as(DataflowPipelineDebugOptions.class);
-    assertEquals(3, options.getTransformNameMapping().size());
-    assertThat(options.getTransformNameMapping(), hasEntry("a", "b"));
-    assertThat(options.getTransformNameMapping(), hasEntry("foo", ""));
-    assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java
deleted file mode 100644
index eff79bb..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.testing.ResetDateTimeProvider;
-import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestRule;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineOptionsTest {
-  @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
-  @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
-
-  @Test
-  public void testJobNameIsSet() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    assertEquals("TestJobName", options.getJobName());
-  }
-
-  @Test
-  public void testUserNameIsNotSet() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().remove("user.name");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("TestApplication");
-    assertEquals("testapplication--1208190706", options.getJobName());
-    assertTrue(options.getJobName().length() <= 40);
-  }
-
-  @Test
-  public void testAppNameAndUserNameAreLong() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("1234567890123456789012345678901234567890");
-    assertEquals(
-        "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
-        options.getJobName());
-  }
-
-  @Test
-  public void testAppNameIsLong() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "abcde");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("1234567890123456789012345678901234567890");
-    assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
-  }
-
-  @Test
-  public void testUserNameIsLong() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("1234567890");
-    assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
-  }
-
-  @Test
-  public void testUtf8UserNameAndApplicationNameIsNormalized() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "ði ıntəˈnæʃənəl ");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn");
-    assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java
deleted file mode 100644
index 1420273..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DataflowProfilingOptions}.
- */
-@RunWith(JUnit4.class)
-public class DataflowProfilingOptionsTest {
-
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  @Test
-  public void testOptionsObject() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
-        "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
-        .as(DataflowPipelineOptions.class);
-    assertTrue(options.getEnableProfilingAgent());
-
-    String json = MAPPER.writeValueAsString(options);
-    assertThat(json, Matchers.containsString(
-        "\"profilingAgentConfiguration\":{\"interval\":21}"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java
deleted file mode 100644
index b752f3d..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.WARN;
-import static org.junit.Assert.assertEquals;
-
-import com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowWorkerLoggingOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowWorkerLoggingOptionsTest {
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testWorkerLogLevelOverrideWithInvalidLogLevel() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Unsupported log level");
-    WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
-  }
-
-  @Test
-  public void testWorkerLogLevelOverrideForClass() throws Exception {
-    assertEquals("{\"org.junit.Test\":\"WARN\"}",
-        MAPPER.writeValueAsString(
-            new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
-  }
-
-  @Test
-  public void testWorkerLogLevelOverrideForPackage() throws Exception {
-    assertEquals("{\"org.junit\":\"WARN\"}",
-        MAPPER.writeValueAsString(
-            new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN)));
-  }
-
-  @Test
-  public void testWorkerLogLevelOverrideForName() throws Exception {
-    assertEquals("{\"A\":\"WARN\"}",
-        MAPPER.writeValueAsString(
-            new WorkerLogLevelOverrides().addOverrideForName("A", WARN)));
-  }
-
-  @Test
-  public void testSerializationAndDeserializationOf() throws Exception {
-    String testValue = "{\"A\":\"WARN\"}";
-    assertEquals(testValue,
-        MAPPER.writeValueAsString(
-            MAPPER.readValue(testValue, WorkerLogLevelOverrides.class)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java
deleted file mode 100644
index 0322426..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
-import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-
-import org.hamcrest.Description;
-import org.hamcrest.Factory;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for BlockingDataflowPipelineRunner.
- */
-@RunWith(JUnit4.class)
-public class BlockingDataflowPipelineRunnerTest {
-
-  @Rule
-  public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
-
-  @Rule
-  public ExpectedException expectedThrown = ExpectedException.none();
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher}
-   * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
-   */
-  private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with job matching ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobException> Matcher<T> expectJob(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new DataflowJobExceptionMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher}
-   * to the return value of {@link DataflowPipelineJob#getJobId()}.
-   */
-  private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> {
-
-    private final Matcher<String> matcher;
-
-    public JobIdMatcher(Matcher<String> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T job) {
-      return matcher.matches(job.getJobId());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("jobId ");
-        matcher.describeMismatch(item.getJobId(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("job with jobId ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) {
-      return new JobIdMatcher<T>(equalTo(jobId));
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
-   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
-   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
-   */
-  private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getReplacedByJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with replacedByJob() ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new ReplacedByJobMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId}
-   * that will immediately terminate in the provided {@code terminalState}.
-   *
-   * <p>The return value may be further mocked.
-   */
-  private DataflowPipelineJob createMockJob(
-      String projectId, String jobId, State terminalState) throws Exception {
-    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
-    when(mockJob.getProjectId()).thenReturn(projectId);
-    when(mockJob.getJobId()).thenReturn(jobId);
-    when(mockJob.waitToFinish(
-        anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
-        .thenReturn(terminalState);
-    return mockJob;
-  }
-
-  /**
-   * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return.
-   * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
-   */
-  private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
-      throws Exception {
-    DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
-    TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setProject(job.getProjectId());
-
-    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
-
-    return new BlockingDataflowPipelineRunner(mockRunner, options);
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in
-   * the {@link State#DONE DONE} state.
-   */
-  @Test
-  public void testJobDoneComplete() throws Exception {
-    createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
-        .run(DirectPipeline.createForTest());
-    expectedLogs.verifyInfo("Job finished with status DONE");
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#FAILED FAILED} state.
-   */
-  @Test
-  public void testFailedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobExecutionException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testFailedJob-jobId")));
-    createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
-        .run(DirectPipeline.createForTest());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
-   */
-  @Test
-  public void testCancelledJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobCancelledException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
-    createMockRunner(
-        createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
-        .run(DirectPipeline.createForTest());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#UPDATED UPDATED} state.
-   */
-  @Test
-  public void testUpdatedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobUpdatedException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
-    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
-        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
-    DataflowPipelineJob job =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
-    DataflowPipelineJob replacedByJob =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
-    when(job.getReplacedByJob()).thenReturn(replacedByJob);
-    createMockRunner(job).run(DirectPipeline.createForTest());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
-   * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it
-   * is an old SDK relative the service).
-   */
-  @Test
-  public void testUnknownJobThrowsException() throws Exception {
-    expectedThrown.expect(IllegalStateException.class);
-    createMockRunner(
-        createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
-        .run(DirectPipeline.createForTest());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job returns a {@code null} state, indicating that it failed to contact the service,
-   * including all of its built-in resilience logic.
-   */
-  @Test
-  public void testNullJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowServiceException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testNullJob-jobId")));
-    createMockRunner(
-        createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
-        .run(DirectPipeline.createForTest());
-  }
-
-  @Test
-  public void testToString() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    assertEquals("BlockingDataflowPipelineRunner#TestJobName",
-        BlockingDataflowPipelineRunner.fromOptions(options).toString());
-  }
-}


Mime
View raw message