beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [09/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner
Date Wed, 27 Apr 2016 01:08:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index d647b0d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DataflowJobExecutionException;
-import org.apache.beam.sdk.runners.DataflowPipelineJob;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.MonitoringUtil;
-import org.apache.beam.sdk.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
-  private static final String TENTATIVE_COUNTER = "tentative";
-  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-  private static final Map<String, PipelineResult> EXECUTION_RESULTS =
-      new ConcurrentHashMap<String, PipelineResult>();
-
-  private final TestDataflowPipelineOptions options;
-  private final DataflowPipelineRunner runner;
-  private int expectedNumberOfAssertions = 0;
-
-  TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
-    this.options = options;
-    this.runner = DataflowPipelineRunner.fromOptions(options);
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static TestDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-    dataflowOptions.setStagingLocation(Joiner.on("/").join(
-        new String[]{dataflowOptions.getTempRoot(),
-          dataflowOptions.getJobName(), "output", "results"}));
-
-    return new TestDataflowPipelineRunner(dataflowOptions);
-  }
-
-  public static PipelineResult getPipelineResultByJobName(String jobName) {
-    return EXECUTION_RESULTS.get(jobName);
-  }
-
-  @Override
-  public DataflowPipelineJob run(Pipeline pipeline) {
-    return run(pipeline, runner);
-  }
-
-  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
-
-    final DataflowPipelineJob job;
-    try {
-      job = runner.run(pipeline);
-    } catch (DataflowJobExecutionException ex) {
-      throw new IllegalStateException("The dataflow failed.");
-    }
-
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
-
-    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
-        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-
-    try {
-      final Optional<Boolean> result;
-
-      if (options.isStreaming()) {
-        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
-            new Callable<Optional<Boolean>>() {
-          @Override
-          public Optional<Boolean> call() throws Exception {
-            try {
-              for (;;) {
-                Optional<Boolean> result = checkForSuccess(job);
-                if (result.isPresent()) {
-                  return result;
-                }
-                Thread.sleep(10000L);
-              }
-            } finally {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-          }
-        });
-        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
-              job.getJobId());
-          job.cancel();
-        }
-        result = resultFuture.get();
-      } else {
-        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
-        result = checkForSuccess(job);
-      }
-      if (!result.isPresent()) {
-        throw new IllegalStateException(
-            "The dataflow did not output a success or failure metric.");
-      } else if (!result.get()) {
-        throw new AssertionError(messageHandler.getErrorMessage() == null ?
-            "The dataflow did not return a failure reason."
-            : messageHandler.getErrorMessage());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause());
-      throw new RuntimeException(e.getCause());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    EXECUTION_RESULTS.put(options.getJobName(), job);
-    return job;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (transform instanceof PAssert.OneSideInputAssert
-        || transform instanceof PAssert.TwoSideInputAssert) {
-      expectedNumberOfAssertions += 1;
-    }
-
-    return runner.apply(transform, input);
-  }
-
-  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
-      throws IOException {
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("The pipeline failed");
-      return Optional.of(false);
-    }
-
-    JobMetrics metrics = job.getDataflowClient().projects().jobs()
-        .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-    } else {
-      int successes = 0;
-      int failures = 0;
-      for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null || metric.getName().getContext() == null
-            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-          // Don't double count using the non-tentative version of the metric.
-          continue;
-        }
-        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-          successes += ((BigDecimal) metric.getScalar()).intValue();
-        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
-          failures += ((BigDecimal) metric.getScalar()).intValue();
-        }
-      }
-
-      if (failures > 0) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(false);
-      } else if (successes >= expectedNumberOfAssertions) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(true);
-      }
-
-      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
-          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
-    }
-
-    return Optional.<Boolean>absent();
-  }
-
-  @Override
-  public String toString() {
-    return "TestDataflowPipelineRunner#" + options.getAppName();
-  }
-
-  /**
-   * Cancels the workflow on the first error message it sees.
-   *
-   * <p>Creates an error message representing the concatenation of all error messages seen.
-   */
-  private static class CancelWorkflowOnError implements JobMessagesHandler {
-    private final DataflowPipelineJob job;
-    private final JobMessagesHandler messageHandler;
-    private final StringBuffer errorMessage;
-    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
-      this.job = job;
-      this.messageHandler = messageHandler;
-      this.errorMessage = new StringBuffer();
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      messageHandler.process(messages);
-      for (JobMessage message : messages) {
-        if (message.getMessageImportance() != null
-            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
-          errorMessage.append(message.getMessageText());
-        }
-      }
-      if (errorMessage.length() > 0) {
-        LOG.info("Cancelling Dataflow job {}", job.getJobId());
-        try {
-          job.cancel();
-        } catch (Exception ignore) {
-          // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
-          // messages.
-        }
-      }
-    }
-
-    private String getErrorMessage() {
-      return errorMessage.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java
deleted file mode 100644
index d0388a3..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
-  private DataflowPipelineOptions dataflowOptions;
-
-  DataflowPathValidator(DataflowPipelineOptions options) {
-    this.dataflowOptions = options;
-  }
-
-  public static DataflowPathValidator fromOptions(PipelineOptions options) {
-    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    Preconditions.checkArgument(
-        dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    Preconditions.checkArgument(gcsPath.isAbsolute(),
-        "Must provide absolute paths for Dataflow");
-    Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "%s expected a valid 'gs://' path but was given '%s'",
-          dataflowOptions.getRunner().getSimpleName(), path), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java
deleted file mode 100644
index 8fcfccf..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/DataflowTransport.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.Transport.getJsonFactory;
-import static org.apache.beam.sdk.util.Transport.getTransport;
-
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-/**
- * Helpers for cloud communication.
- */
-public class DataflowTransport {
-
-
-  private static class ApiComponents {
-    public String rootUrl;
-    public String servicePath;
-
-    public ApiComponents(String root, String path) {
-      this.rootUrl = root;
-      this.servicePath = path;
-    }
-  }
-
-  private static ApiComponents apiComponentsFromUrl(String urlString) {
-    try {
-      URL url = new URL(urlString);
-      String rootUrl = url.getProtocol() + "://" + url.getHost() +
-          (url.getPort() > 0 ? ":" + url.getPort() : "");
-      return new ApiComponents(rootUrl, url.getPath());
-    } catch (MalformedURLException e) {
-      throw new RuntimeException("Invalid URL: " + urlString);
-    }
-  }
-
-  /**
-   * Returns a Google Cloud Dataflow client builder.
-   */
-  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
-    String servicePath = options.getDataflowEndpoint();
-    ApiComponents components;
-    if (servicePath.contains("://")) {
-      components = apiComponentsFromUrl(servicePath);
-    } else {
-      components = new ApiComponents(options.getApiRootUrl(), servicePath);
-    }
-
-    return new Dataflow.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setRootUrl(components.rootUrl)
-        .setServicePath(components.servicePath)
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
-    return new Clouddebugger.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Dataflow client that does not automatically retry failed
-   * requests.
-   */
-  public static Dataflow.Builder
-      newRawDataflowClient(DataflowPipelineOptions options) {
-    return newDataflowClient(options)
-        .setHttpRequestInitializer(options.getGcpCredential())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  private static HttpRequestInitializer chainHttpRequestInitializer(
-      Credential credential, HttpRequestInitializer httpRequestInitializer) {
-    if (credential == null) {
-      return httpRequestInitializer;
-    } else {
-      return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java
deleted file mode 100644
index 4f1f673..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GcsStager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.DataflowPipelineDebugOptions;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-/**
- * Utility class for staging files to GCS.
- */
-public class GcsStager implements Stager {
-  private DataflowPipelineOptions options;
-
-  private GcsStager(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
-  public static GcsStager fromOptions(PipelineOptions options) {
-    return new GcsStager(options.as(DataflowPipelineOptions.class));
-  }
-
-  @Override
-  public List<DataflowPackage> stageFiles() {
-    Preconditions.checkNotNull(options.getStagingLocation());
-    List<String> filesToStage = options.getFilesToStage();
-    String windmillBinary =
-        options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
-    if (windmillBinary != null) {
-      filesToStage.add("windmill_main=" + windmillBinary);
-    }
-    return PackageUtil.stageClasspathElements(
-        options.getFilesToStage(), options.getStagingLocation());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java
deleted file mode 100644
index 5afca52..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/MonitoringUtil.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.TimeUtil.fromCloudTime;
-
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for monitoring jobs submitted to the service.
- */
-public final class MonitoringUtil {
-
-  private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
-  private static final String ENDPOINT_OVERRIDE_ENV_VAR =
-      "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
-
-  private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
-      ImmutableMap
-          .<String, State>builder()
-          .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
-          .put("JOB_STATE_STOPPED", State.STOPPED)
-          .put("JOB_STATE_RUNNING", State.RUNNING)
-          .put("JOB_STATE_DONE", State.DONE)
-          .put("JOB_STATE_FAILED", State.FAILED)
-          .put("JOB_STATE_CANCELLED", State.CANCELLED)
-          .put("JOB_STATE_UPDATED", State.UPDATED)
-          .build();
-
-  private String projectId;
-  private Messages messagesClient;
-
-  /**
-   * An interface that can be used for defining callbacks to receive a list
-   * of JobMessages containing monitoring information.
-   */
-  public interface JobMessagesHandler {
-    /** Process the rows. */
-    void process(List<JobMessage> messages);
-  }
-
-  /** A handler that prints monitoring messages to a stream. */
-  public static class PrintHandler implements JobMessagesHandler {
-    private PrintStream out;
-
-    /**
-     * Construct the handler.
-     *
-     * @param stream The stream to write the messages to.
-     */
-    public PrintHandler(PrintStream stream) {
-      out = stream;
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      for (JobMessage message : messages) {
-        if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
-          continue;
-        }
-        String importanceString = null;
-        if (message.getMessageImportance() == null) {
-          continue;
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          importanceString = "Error:   ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
-          importanceString = "Warning: ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
-          importanceString = "Basic:  ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
-          importanceString = "Detail:  ";
-        } else {
-          // TODO: Remove filtering here once getJobMessages supports minimum
-          // importance.
-          continue;
-        }
-        @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
-        if (time == null) {
-          out.print("UNKNOWN TIMESTAMP: ");
-        } else {
-          out.print(time + ": ");
-        }
-        if (importanceString != null) {
-          out.print(importanceString);
-        }
-        out.println(message.getMessageText());
-      }
-      out.flush();
-    }
-  }
-
-  /** Construct a helper for monitoring. */
-  public MonitoringUtil(String projectId, Dataflow dataflow) {
-    this(projectId, dataflow.projects().jobs().messages());
-  }
-
-  // @VisibleForTesting
-  MonitoringUtil(String projectId, Messages messagesClient) {
-    this.projectId = projectId;
-    this.messagesClient = messagesClient;
-  }
-
-  /**
-   * Comparator for sorting rows in increasing order based on timestamp.
-   */
-  public static class TimeStampComparator implements Comparator<JobMessage> {
-    @Override
-    public int compare(JobMessage o1, JobMessage o2) {
-      @Nullable Instant t1 = fromCloudTime(o1.getTime());
-      if (t1 == null) {
-        return -1;
-      }
-      @Nullable Instant t2 = fromCloudTime(o2.getTime());
-      if (t2 == null) {
-        return 1;
-      }
-      return t1.compareTo(t2);
-    }
-  }
-
-  /**
-   * Return job messages sorted in ascending order by timestamp.
-   * @param jobId The id of the job to get the messages for.
-   * @param startTimestampMs Return only those messages with a
-   *   timestamp greater than this value.
-   * @return collection of messages
-   * @throws IOException
-   */
-  public ArrayList<JobMessage> getJobMessages(
-      String jobId, long startTimestampMs) throws IOException {
-    // TODO: Allow filtering messages by importance
-    Instant startTimestamp = new Instant(startTimestampMs);
-    ArrayList<JobMessage> allMessages = new ArrayList<>();
-    String pageToken = null;
-    while (true) {
-      Messages.List listRequest = messagesClient.list(projectId, jobId);
-      if (pageToken != null) {
-        listRequest.setPageToken(pageToken);
-      }
-      ListJobMessagesResponse response = listRequest.execute();
-
-      if (response == null || response.getJobMessages() == null) {
-        return allMessages;
-      }
-
-      for (JobMessage m : response.getJobMessages()) {
-        @Nullable Instant timestamp = fromCloudTime(m.getTime());
-        if (timestamp == null) {
-          continue;
-        }
-        if (timestamp.isAfter(startTimestamp)) {
-          allMessages.add(m);
-        }
-      }
-
-      if (response.getNextPageToken() == null) {
-        break;
-      } else {
-        pageToken = response.getNextPageToken();
-      }
-    }
-
-    Collections.sort(allMessages, new TimeStampComparator());
-    return allMessages;
-  }
-
-  public static String getJobMonitoringPageURL(String projectName, String jobId) {
-    try {
-      // Project name is allowed in place of the project id: the user will be redirected to a URL
-      // that has the project name replaced with project id.
-      return String.format(
-          "https://console.developers.google.com/project/%s/dataflow/job/%s",
-          URLEncoder.encode(projectName, "UTF-8"),
-          URLEncoder.encode(jobId, "UTF-8"));
-    } catch (UnsupportedEncodingException e) {
-      // Should never happen.
-      throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
-    }
-  }
-
-  public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
-
-    // If using a different Dataflow API than default, prefix command with an API override.
-    String dataflowApiOverridePrefix = "";
-    String apiUrl = options.getDataflowClient().getBaseUrl();
-    if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
-      dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
-    }
-
-    // Assemble cancel command from optional prefix and project/job parameters.
-    return String.format("%s%s jobs --project=%s cancel %s",
-        dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
-  }
-
-  public static State toState(String stateName) {
-    return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
-        State.UNKNOWN);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java
deleted file mode 100644
index 7d492b4..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/PackageUtil.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CountingOutputStream;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.core.Base64Variants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/** Helper routines for packages. */
-public class PackageUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
-  /**
-   * A reasonable upper bound on the number of jars required to launch a Dataflow job.
-   */
-  public static final int SANE_CLASSPATH_SIZE = 1000;
-  /**
-   * The initial interval to use between package staging attempts.
-   */
-  private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
-  /**
-   * The maximum number of attempts when staging a file.
-   */
-  private static final int MAX_ATTEMPTS = 5;
-
-  /**
-   * Translates exceptions from API calls.
-   */
-  private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
-
-  /**
-   * Creates a DataflowPackage containing information about how a classpath element should be
-   * staged, including the staging destination as well as its size and hash.
-   *
-   * @param classpathElement The local path for the classpath element.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return The package.
-   */
-  @Deprecated
-  public static DataflowPackage createPackage(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
-        .getDataflowPackage();
-  }
-
-  /**
-   * Compute and cache the attributes of a classpath element that we will need to stage it.
-   *
-   * @param classpathElement the file or directory to be staged.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
-   */
-  static PackageAttributes createPackageAttributes(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    try {
-      boolean directory = classpathElement.isDirectory();
-
-      // Compute size and hash in one pass over file or directory.
-      Hasher hasher = Hashing.md5().newHasher();
-      OutputStream hashStream = Funnels.asOutputStream(hasher);
-      CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
-
-      if (!directory) {
-        // Files are staged as-is.
-        Files.asByteSource(classpathElement).copyTo(countingOutputStream);
-      } else {
-        // Directories are recursively zipped.
-        ZipFiles.zipDirectory(classpathElement, countingOutputStream);
-      }
-
-      long size = countingOutputStream.getCount();
-      String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
-      // Create the DataflowPackage with staging name and location.
-      String uniqueName = getUniqueContentName(classpathElement, hash);
-      String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
-      DataflowPackage target = new DataflowPackage();
-      target.setName(overridePackageName != null ? overridePackageName : uniqueName);
-      target.setLocation(resourcePath);
-
-      return new PackageAttributes(size, hash, directory, target);
-    } catch (IOException e) {
-      throw new RuntimeException("Package setup failure for " + classpathElement, e);
-    }
-  }
-
-  /**
-   * Transfers the classpath elements to the staging location.
-   *
-   * @param classpathElements The elements to stage.
-   * @param stagingPath The base location to stage the elements to.
-   * @return A list of cloud workflow packages, each representing a classpath element.
-   */
-  public static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath) {
-    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
-  }
-
-  // Visible for testing.
-  static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath,
-      Sleeper retrySleeper) {
-    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
-        + "prepare for execution.", classpathElements.size());
-
-    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
-      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
-          + "copies to all workers. Having this many entries on your classpath may be indicative "
-          + "of an issue in your pipeline. You may want to consider trimming the classpath to "
-          + "necessary dependencies only, using --filesToStage pipeline option to override "
-          + "what files are being staged, or bundling several dependencies into one.",
-          classpathElements.size());
-    }
-
-    ArrayList<DataflowPackage> packages = new ArrayList<>();
-
-    if (stagingPath == null) {
-      throw new IllegalArgumentException(
-          "Can't stage classpath elements on because no staging location has been provided");
-    }
-
-    int numUploaded = 0;
-    int numCached = 0;
-    for (String classpathElement : classpathElements) {
-      String packageName = null;
-      if (classpathElement.contains("=")) {
-        String[] components = classpathElement.split("=", 2);
-        packageName = components[0];
-        classpathElement = components[1];
-      }
-
-      File file = new File(classpathElement);
-      if (!file.exists()) {
-        LOG.warn("Skipping non-existent classpath element {} that was specified.",
-            classpathElement);
-        continue;
-      }
-
-      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
-
-      DataflowPackage workflowPackage = attributes.getDataflowPackage();
-      packages.add(workflowPackage);
-      String target = workflowPackage.getLocation();
-
-      // TODO: Should we attempt to detect the Mime type rather than
-      // always using MimeTypes.BINARY?
-      try {
-        try {
-          long remoteLength = IOChannelUtils.getSizeBytes(target);
-          if (remoteLength == attributes.getSize()) {
-            LOG.debug("Skipping classpath element already staged: {} at {}",
-                classpathElement, target);
-            numCached++;
-            continue;
-          }
-        } catch (FileNotFoundException expected) {
-          // If the file doesn't exist, it means we need to upload it.
-        }
-
-        // Upload file, retrying on failure.
-        AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-            MAX_ATTEMPTS,
-            INITIAL_BACKOFF_INTERVAL_MS);
-        while (true) {
-          try {
-            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
-            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
-              copyContent(classpathElement, writer);
-            }
-            numUploaded++;
-            break;
-          } catch (IOException e) {
-            if (ERROR_EXTRACTOR.accessDenied(e)) {
-              String errorMessage = String.format(
-                  "Uploaded failed due to permissions error, will NOT retry staging "
-                  + "of classpath %s. Please verify credentials are valid and that you have "
-                  + "write access to %s. Stale credentials can be resolved by executing "
-                  + "'gcloud auth login'.", classpathElement, target);
-              LOG.error(errorMessage);
-              throw new IOException(errorMessage, e);
-            } else if (!backoff.atMaxAttempts()) {
-              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
-                  classpathElement, e);
-              BackOffUtils.next(retrySleeper, backoff);
-            } else {
-              // Rethrow last error, to be included as a cause in the catch below.
-              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
-                  classpathElement, e);
-              throw e;
-            }
-          }
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
-      }
-    }
-
-    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
-        + "{} files cached",
-        numUploaded, numCached);
-
-    return packages;
-  }
-
-  /**
-   * Returns a unique name for a file with a given content hash.
-   *
-   * <p>Directory paths are removed. Example:
-   * <pre>
-   * dir="a/b/c/d", contentHash="f000" => d-f000.jar
-   * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
-   * file="a/b/c/d", contentHash="f000" => d-f000
-   * </pre>
-   */
-  static String getUniqueContentName(File classpathElement, String contentHash) {
-    String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
-    String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
-    if (classpathElement.isDirectory()) {
-      return fileName + "-" + contentHash + ".jar";
-    } else if (fileExtension.isEmpty()) {
-      return fileName + "-" + contentHash;
-    }
-    return fileName + "-" + contentHash + "." + fileExtension;
-  }
-
-  /**
-   * Copies the contents of the classpathElement to the output channel.
-   *
-   * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
-   * otherwise the file contents are copied as-is.
-   *
-   * <p>The output channel is not closed.
-   */
-  private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
-      throws IOException {
-    final File classpathElementFile = new File(classpathElement);
-    if (classpathElementFile.isDirectory()) {
-      ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
-    } else {
-      Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
-    }
-  }
-  /**
-   * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
-   */
-  static class PackageAttributes {
-    private final boolean directory;
-    private final long size;
-    private final String hash;
-    private DataflowPackage dataflowPackage;
-
-    public PackageAttributes(long size, String hash, boolean directory,
-        DataflowPackage dataflowPackage) {
-      this.size = size;
-      this.hash = Objects.requireNonNull(hash, "hash");
-      this.directory = directory;
-      this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
-    }
-
-    /**
-     * @return the dataflowPackage
-     */
-    public DataflowPackage getDataflowPackage() {
-      return dataflowPackage;
-    }
-
-    /**
-     * @return the directory
-     */
-    public boolean isDirectory() {
-      return directory;
-    }
-
-    /**
-     * @return the size
-     */
-    public long getSize() {
-      return size;
-    }
-
-    /**
-     * @return the hash
-     */
-    public String getHash() {
-      return hash;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java
deleted file mode 100644
index 9547ac1..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/Stager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-
-import java.util.List;
-
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
-public interface Stager {
-  /* Stage files and return a list of packages. */
-  public List<DataflowPackage> stageFiles();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
new file mode 100644
index 0000000..bc570e1
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+
+import org.hamcrest.Description;
+import org.hamcrest.Factory;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for BlockingDataflowPipelineRunner.
+ */
+@RunWith(JUnit4.class)
+public class BlockingDataflowPipelineRunnerTest {
+
+  @Rule
+  public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
+
+  @Rule
+  public ExpectedException expectedThrown = ExpectedException.none();
+
+  /**
+   * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher}
+   * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
+   */
+  private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
+      extends TypeSafeMatcher<T> {
+
+    private final Matcher<DataflowPipelineJob> matcher;
+
+    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T ex) {
+      return matcher.matches(ex.getJob());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("job ");
+        matcher.describeMismatch(item.getMessage(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("exception with job matching ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowJobException> Matcher<T> expectJob(
+        Matcher<DataflowPipelineJob> matcher) {
+      return new DataflowJobExceptionMatcher<T>(matcher);
+    }
+  }
+
+  /**
+   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher}
+   * to the return value of {@link DataflowPipelineJob#getJobId()}.
+   */
+  private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> {
+
+    private final Matcher<String> matcher;
+
+    public JobIdMatcher(Matcher<String> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T job) {
+      return matcher.matches(job.getJobId());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("jobId ");
+        matcher.describeMismatch(item.getJobId(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("job with jobId ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) {
+      return new JobIdMatcher<T>(equalTo(jobId));
+    }
+  }
+
+  /**
+   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
+   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
+   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
+   */
+  private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
+      extends TypeSafeMatcher<T> {
+
+    private final Matcher<DataflowPipelineJob> matcher;
+
+    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T ex) {
+      return matcher.matches(ex.getReplacedByJob());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("job ");
+        matcher.describeMismatch(item.getMessage(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("exception with replacedByJob() ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
+        Matcher<DataflowPipelineJob> matcher) {
+      return new ReplacedByJobMatcher<T>(matcher);
+    }
+  }
+
+  /**
+   * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId}
+   * that will immediately terminate in the provided {@code terminalState}.
+   *
+   * <p>The return value may be further mocked.
+   */
+  private DataflowPipelineJob createMockJob(
+      String projectId, String jobId, State terminalState) throws Exception {
+    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
+    when(mockJob.getProjectId()).thenReturn(projectId);
+    when(mockJob.getJobId()).thenReturn(jobId);
+    when(mockJob.waitToFinish(
+        anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
+        .thenReturn(terminalState);
+    return mockJob;
+  }
+
+  /**
+   * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return.
+   * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
+   */
+  private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
+      throws Exception {
+    DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
+    TestDataflowPipelineOptions options =
+        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setProject(job.getProjectId());
+
+    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
+
+    return new BlockingDataflowPipelineRunner(mockRunner, options);
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in
+   * the {@link State#DONE DONE} state.
+   */
+  @Test
+  public void testJobDoneComplete() throws Exception {
+    createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
+        .run(TestPipeline.create());
+    expectedLogs.verifyInfo("Job finished with status DONE");
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#FAILED FAILED} state.
+   */
+  @Test
+  public void testFailedJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobExecutionException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testFailedJob-jobId")));
+    createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
+        .run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
+   */
+  @Test
+  public void testCancelledJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobCancelledException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
+    createMockRunner(
+            createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
+        .run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#UPDATED UPDATED} state.
+   */
+  @Test
+  public void testUpdatedJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobUpdatedException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
+    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
+        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
+    DataflowPipelineJob job =
+        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
+    DataflowPipelineJob replacedByJob =
+        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
+    when(job.getReplacedByJob()).thenReturn(replacedByJob);
+    createMockRunner(job).run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
+   * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it
+   * is an old SDK relative the service).
+   */
+  @Test
+  public void testUnknownJobThrowsException() throws Exception {
+    expectedThrown.expect(IllegalStateException.class);
+    createMockRunner(
+            createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
+        .run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
+   * when a job returns a {@code null} state, indicating that it failed to contact the service,
+   * including all of its built-in resilience logic.
+   */
+  @Test
+  public void testNullJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowServiceException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testNullJob-jobId")));
+    createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
+        .run(TestPipeline.create());
+  }
+
+  @Test
+  public void testToString() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setGcpCredential(new TestCredential());
+    options.setPathValidatorClass(NoopPathValidator.class);
+    assertEquals("BlockingDataflowPipelineRunner#testjobname",
+        BlockingDataflowPipelineRunner.fromOptions(options).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
new file mode 100644
index 0000000..80b7e7b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSetMultimap;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for DataflowPipelineJob.
+ */
+@RunWith(JUnit4.class)
+public class DataflowPipelineJobTest {
+  private static final String PROJECT_ID = "someProject";
+  private static final String JOB_ID = "1234";
+
+  @Mock
+  private Dataflow mockWorkflowClient;
+  @Mock
+  private Dataflow.Projects mockProjects;
+  @Mock
+  private Dataflow.Projects.Jobs mockJobs;
+  @Rule
+  public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    when(mockWorkflowClient.projects()).thenReturn(mockProjects);
+    when(mockProjects.jobs()).thenReturn(mockJobs);
+  }
+
+  /**
+   * Validates that a given time is valid for the total time slept by a
+   * AttemptBoundedExponentialBackOff given the number of retries and
+   * an initial polling interval.
+   *
+   * @param pollingIntervalMillis The initial polling interval given.
+   * @param attempts The number of attempts made
+   * @param timeSleptMillis The amount of time slept by the clock. This is checked
+   * against the valid interval.
+   */
+  void checkValidInterval(long pollingIntervalMillis, int attempts, long timeSleptMillis) {
+    long highSum = 0;
+    long lowSum = 0;
+    for (int i = 1; i < attempts; i++) {
+      double currentInterval =
+          pollingIntervalMillis
+          * Math.pow(AttemptBoundedExponentialBackOff.DEFAULT_MULTIPLIER, i - 1);
+      double offset =
+          AttemptBoundedExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR * currentInterval;
+      highSum += Math.round(currentInterval + offset);
+      lowSum += Math.round(currentInterval - offset);
+    }
+    assertThat(timeSleptMillis, allOf(greaterThanOrEqualTo(lowSum), lessThanOrEqualTo(highSum)));
+  }
+
+  @Test
+  public void testWaitToFinishMessagesFail() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_" + State.DONE.name());
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    MonitoringUtil.JobMessagesHandler jobHandler = mock(MonitoringUtil.JobMessagesHandler.class);
+    Dataflow.Projects.Jobs.Messages mockMessages =
+        mock(Dataflow.Projects.Jobs.Messages.class);
+    Messages.List listRequest = mock(Dataflow.Projects.Jobs.Messages.List.class);
+    when(mockJobs.messages()).thenReturn(mockMessages);
+    when(mockMessages.list(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(listRequest);
+    when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    State state = job.waitToFinish(5, TimeUnit.MINUTES, jobHandler, fastClock, fastClock);
+    assertEquals(null, state);
+  }
+
+  public State mockWaitToFinishInState(State state) throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_" + state.name());
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    return job.waitToFinish(1, TimeUnit.MINUTES, null, fastClock, fastClock);
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link State#DONE DONE}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishDone() throws Exception {
+    assertEquals(State.DONE, mockWaitToFinishInState(State.DONE));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishFailed() throws Exception {
+    assertEquals(State.FAILED, mockWaitToFinishInState(State.FAILED));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishCancelled() throws Exception {
+    assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishUpdated() throws Exception {
+    assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED));
+  }
+
+  @Test
+  public void testWaitToFinishFail() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    long startTime = fastClock.nanoTime();
+    State state = job.waitToFinish(5, TimeUnit.MINUTES, null, fastClock, fastClock);
+    assertEquals(null, state);
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
+    checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL,
+        DataflowPipelineJob.MESSAGES_POLLING_ATTEMPTS, timeDiff);
+  }
+
+  @Test
+  public void testWaitToFinishTimeFail() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+    long startTime = fastClock.nanoTime();
+    State state = job.waitToFinish(4, TimeUnit.MILLISECONDS, null, fastClock, fastClock);
+    assertEquals(null, state);
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
+    // Should only sleep for the 4 ms remaining.
+    assertEquals(timeDiff, 4L);
+  }
+
+  @Test
+  public void testGetStateReturnsServiceState() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_" + State.RUNNING.name());
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    assertEquals(
+        State.RUNNING,
+        job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock));
+  }
+
+  @Test
+  public void testGetStateWithExceptionReturnsUnknown() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    long startTime = fastClock.nanoTime();
+    assertEquals(
+        State.UNKNOWN,
+        job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock));
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
+    checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL,
+        DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, timeDiff);
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue()
+      throws IOException, AggregatorRetrievalException {
+    Aggregator<?, ?> aggregator = mock(Aggregator.class);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValues(), empty());
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue()
+      throws IOException, AggregatorRetrievalException {
+    Aggregator<?, ?> aggregator = mock(Aggregator.class);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    jobMetrics.setMetrics(null);
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValues(), empty());
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    MetricUpdate update = new MetricUpdate();
+    long stepValue = 1234L;
+    update.setScalar(new BigDecimal(stepValue));
+
+    MetricStructuredName structuredName = new MetricStructuredName();
+    structuredName.setName(aggregatorName);
+    structuredName.setContext(ImmutableMap.of("step", stepName));
+    update.setName(structuredName);
+
+    jobMetrics.setMetrics(ImmutableList.of(update));
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
+    assertThat(values.getValuesAtSteps().size(), equalTo(1));
+    assertThat(values.getValues(), contains(stepValue));
+    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue)));
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
+
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
+    String otherStepName = "s88";
+    String otherFullName = "Spam/Ham/Eggs";
+    AppliedPTransform<?, ?, ?> otherAppliedTransform =
+        appliedPTransform(otherFullName, otherTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
+                                aggregator, pTransform, aggregator, otherTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
+            appliedTransform, stepName, otherAppliedTransform, otherStepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    MetricUpdate updateOne = new MetricUpdate();
+    long stepValue = 1234L;
+    updateOne.setScalar(new BigDecimal(stepValue));
+
+    MetricStructuredName structuredNameOne = new MetricStructuredName();
+    structuredNameOne.setName(aggregatorName);
+    structuredNameOne.setContext(ImmutableMap.of("step", stepName));
+    updateOne.setName(structuredNameOne);
+
+    MetricUpdate updateTwo = new MetricUpdate();
+    long stepValueTwo = 1024L;
+    updateTwo.setScalar(new BigDecimal(stepValueTwo));
+
+    MetricStructuredName structuredNameTwo = new MetricStructuredName();
+    structuredNameTwo.setName(aggregatorName);
+    structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName));
+    updateTwo.setName(structuredNameTwo);
+
+    jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
+    assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo));
+    assertThat(values.getValuesAtSteps().size(), equalTo(2));
+    assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo));
+    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo)));
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    MetricUpdate ignoredUpdate = new MetricUpdate();
+    ignoredUpdate.setScalar(null);
+
+    MetricStructuredName ignoredName = new MetricStructuredName();
+    ignoredName.setName("ignoredAggregator.elementCount.out0");
+    ignoredName.setContext(null);
+    ignoredUpdate.setName(ignoredName);
+
+    jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValuesAtSteps().entrySet(), empty());
+    assertThat(values.getValues(), empty());
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithUnusedAggregatorThrowsException()
+      throws AggregatorRetrievalException {
+    Aggregator<?, ?> aggregator = mock(Aggregator.class);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("not used in this pipeline");
+
+    job.getAggregatorValues(aggregator);
+  }
+
+  @Test
+  public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    IOException cause = new IOException();
+    when(getMetrics.execute()).thenThrow(cause);
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+
+    thrown.expect(AggregatorRetrievalException.class);
+    thrown.expectCause(is(cause));
+    thrown.expectMessage(aggregator.toString());
+    thrown.expectMessage("when retrieving Aggregator values for");
+
+    job.getAggregatorValues(aggregator);
+  }
+
+  private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> {
+    private final CombineFn<InT, ?, OutT> combineFn;
+    private final String name;
+
+    public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) {
+      this.combineFn = combineFn;
+      this.name = name;
+    }
+
+    @Override
+    public void addValue(InT value) {
+      throw new AssertionError();
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public CombineFn<InT, ?, OutT> getCombineFn() {
+      return combineFn;
+    }
+  }
+
+  private AppliedPTransform<?, ?, ?> appliedPTransform(
+      String fullName, PTransform<PInput, POutput> transform) {
+    return AppliedPTransform.of(fullName, mock(PInput.class), mock(POutput.class), transform);
+  }
+}


Mime
View raw message