beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Remove Pipeline from TestDataflowPipelineRunner
Date Mon, 20 Jun 2016 16:28:04 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0e4d0a9ae -> d9cdcadf5


Remove Pipeline from TestDataflowPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/380c1a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/380c1a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/380c1a83

Branch: refs/heads/master
Commit: 380c1a83a82aa8b151d309cba82df3e2fb9398ec
Parents: 0e4d0a9
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Jun 17 16:36:22 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Jun 20 09:27:48 2016 -0700

----------------------------------------------------------------------
 examples/java/README.md                         |   4 +-
 .../beam/runners/flink/examples/TFIDF.java      |   2 +-
 .../testing/TestDataflowPipelineRunner.java     | 271 -------------------
 .../dataflow/testing/TestDataflowRunner.java    | 271 +++++++++++++++++++
 .../testing/TestDataflowRunnerTest.java         |  40 +--
 5 files changed, 294 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/examples/java/README.md
----------------------------------------------------------------------
diff --git a/examples/java/README.md b/examples/java/README.md
index ef3cf07..2b5edf5 100644
--- a/examples/java/README.md
+++ b/examples/java/README.md
@@ -64,7 +64,7 @@ the same pipeline on fully managed resources in Google Cloud Platform:
     -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--project=<YOUR CLOUD PLATFORM PROJECT ID> \
     --tempLocation=<YOUR CLOUD STORAGE LOCATION> \
-    --runner=BlockingDataflowPipelineRunner"
+    --runner=BlockingDataflowRunner"
 
 Make sure to use your project id, not the project number or the descriptive name.
 The Cloud Storage location should be entered in the form of
@@ -86,7 +86,7 @@ Platform:
     org.apache.beam.examples.WordCount \
     --project=<YOUR CLOUD PLATFORM PROJECT ID> \
     --tempLocation=<YOUR CLOUD STORAGE LOCATION> \
-    --runner=BlockingDataflowPipelineRunner
+    --runner=BlockingDataflowRunner
 
 Other examples can be run similarly by replacing the `WordCount` class path with the example
classpath, e.g.
 `org.apache.beam.examples.cookbook.BigQueryTornadoes`,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 0afde0a..876ecde 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -82,7 +82,7 @@ import java.util.Set;
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
  *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
+ *   --runner=BlockingDataflowRunner
  * and an output prefix on GCS:
  *   --output=gs://YOUR_OUTPUT_PREFIX
  * }</pre>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index f83a139..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
{
-  private static final String TENTATIVE_COUNTER = "tentative";
-  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
-  private final TestDataflowPipelineOptions options;
-  private final DataflowRunner runner;
-  private int expectedNumberOfAssertions = 0;
-
-  TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
-    this.options = options;
-    this.runner = DataflowRunner.fromOptions(options);
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static TestDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-    dataflowOptions.setStagingLocation(Joiner.on("/").join(
-        new String[]{dataflowOptions.getTempRoot(),
-          dataflowOptions.getJobName(), "output", "results"}));
-
-    return new TestDataflowPipelineRunner(dataflowOptions);
-  }
-
-  @Override
-  public DataflowPipelineJob run(Pipeline pipeline) {
-    return run(pipeline, runner);
-  }
-
-  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
-
-    TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
-    final DataflowPipelineJob job;
-    try {
-      job = runner.run(pipeline);
-    } catch (DataflowJobExecutionException ex) {
-      throw new IllegalStateException("The dataflow failed.");
-    }
-
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
-
-    assertThat(job, testPipelineOptions.getOnCreateMatcher());
-
-    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
-        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-
-    try {
-      final Optional<Boolean> result;
-
-      if (options.isStreaming()) {
-        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
-            new Callable<Optional<Boolean>>() {
-          @Override
-          public Optional<Boolean> call() throws Exception {
-            try {
-              for (;;) {
-                Optional<Boolean> result = checkForSuccess(job);
-                if (result.isPresent()) {
-                  return result;
-                }
-                Thread.sleep(10000L);
-              }
-            } finally {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-          }
-        });
-        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
-              job.getJobId());
-          job.cancel();
-        }
-        result = resultFuture.get();
-      } else {
-        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
-        result = checkForSuccess(job);
-      }
-      if (!result.isPresent()) {
-        throw new IllegalStateException(
-            "The dataflow did not output a success or failure metric.");
-      } else if (!result.get()) {
-        throw new AssertionError(messageHandler.getErrorMessage() == null
-            ? "The dataflow did not return a failure reason."
-            : messageHandler.getErrorMessage());
-      } else {
-        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause());
-      throw new RuntimeException(e.getCause());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return job;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (transform instanceof PAssert.OneSideInputAssert
-        || transform instanceof PAssert.GroupThenAssert
-        || transform instanceof PAssert.GroupThenAssertForSingleton) {
-      expectedNumberOfAssertions += 1;
-    }
-
-    return runner.apply(transform, input);
-  }
-
-  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
-      throws IOException {
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("The pipeline failed");
-      return Optional.of(false);
-    }
-
-    JobMetrics metrics = job.getDataflowClient().projects().jobs()
-        .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-    } else {
-      int successes = 0;
-      int failures = 0;
-      for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null || metric.getName().getContext() == null
-            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-          // Don't double count using the non-tentative version of the metric.
-          continue;
-        }
-        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-          successes += ((BigDecimal) metric.getScalar()).intValue();
-        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
-          failures += ((BigDecimal) metric.getScalar()).intValue();
-        }
-      }
-
-      if (failures > 0) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures
out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(false);
-      } else if (successes >= expectedNumberOfAssertions) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures
out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(true);
-      }
-
-      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected
"
-          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
-    }
-
-    return Optional.<Boolean>absent();
-  }
-
-  @Override
-  public String toString() {
-    return "TestDataflowPipelineRunner#" + options.getAppName();
-  }
-
-  /**
-   * Cancels the workflow on the first error message it sees.
-   *
-   * <p>Creates an error message representing the concatenation of all error messages
seen.
-   */
-  private static class CancelWorkflowOnError implements JobMessagesHandler {
-    private final DataflowPipelineJob job;
-    private final JobMessagesHandler messageHandler;
-    private final StringBuffer errorMessage;
-    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler)
{
-      this.job = job;
-      this.messageHandler = messageHandler;
-      this.errorMessage = new StringBuffer();
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      messageHandler.process(messages);
-      for (JobMessage message : messages) {
-        if (message.getMessageImportance() != null
-            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
-          errorMessage.append(message.getMessageText());
-        }
-      }
-      if (errorMessage.length() > 0) {
-        LOG.info("Cancelling Dataflow job {}", job.getJobId());
-        try {
-          job.cancel();
-        } catch (Exception ignore) {
-          // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
-          // messages.
-        }
-      }
-    }
-
-    private String getErrorMessage() {
-      return errorMessage.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
new file mode 100644
index 0000000..19a2178
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a
+ * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+  private static final String TENTATIVE_COUNTER = "tentative";
+  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
+
+  private final TestDataflowPipelineOptions options;
+  private final DataflowRunner runner;
+  private int expectedNumberOfAssertions = 0;
+
+  TestDataflowRunner(TestDataflowPipelineOptions options) {
+    this.options = options;
+    this.runner = DataflowRunner.fromOptions(options);
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static TestDataflowRunner fromOptions(
+      PipelineOptions options) {
+    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+    dataflowOptions.setStagingLocation(Joiner.on("/").join(
+        new String[]{dataflowOptions.getTempRoot(),
+          dataflowOptions.getJobName(), "output", "results"}));
+
+    return new TestDataflowRunner(dataflowOptions);
+  }
+
+  @Override
+  public DataflowPipelineJob run(Pipeline pipeline) {
+    return run(pipeline, runner);
+  }
+
+  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
+
+    TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
+    final DataflowPipelineJob job;
+    try {
+      job = runner.run(pipeline);
+    } catch (DataflowJobExecutionException ex) {
+      throw new IllegalStateException("The dataflow failed.");
+    }
+
+    LOG.info("Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(), expectedNumberOfAssertions);
+
+    assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
+    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
+        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+
+    try {
+      final Optional<Boolean> result;
+
+      if (options.isStreaming()) {
+        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
+            new Callable<Optional<Boolean>>() {
+          @Override
+          public Optional<Boolean> call() throws Exception {
+            try {
+              for (;;) {
+                Optional<Boolean> result = checkForSuccess(job);
+                if (result.isPresent()) {
+                  return result;
+                }
+                Thread.sleep(10000L);
+              }
+            } finally {
+              LOG.info("Cancelling Dataflow job {}", job.getJobId());
+              job.cancel();
+            }
+          }
+        });
+        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
+        if (finalState == null || finalState == State.RUNNING) {
+          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
+              job.getJobId());
+          job.cancel();
+        }
+        result = resultFuture.get();
+      } else {
+        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
+        result = checkForSuccess(job);
+      }
+      if (!result.isPresent()) {
+        throw new IllegalStateException(
+            "The dataflow did not output a success or failure metric.");
+      } else if (!result.get()) {
+        throw new AssertionError(messageHandler.getErrorMessage() == null
+            ? "The dataflow did not return a failure reason."
+            : messageHandler.getErrorMessage());
+      } else {
+        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause());
+      throw new RuntimeException(e.getCause());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return job;
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (transform instanceof PAssert.OneSideInputAssert
+        || transform instanceof PAssert.GroupThenAssert
+        || transform instanceof PAssert.GroupThenAssertForSingleton) {
+      expectedNumberOfAssertions += 1;
+    }
+
+    return runner.apply(transform, input);
+  }
+
+  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
+      throws IOException {
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info("The pipeline failed");
+      return Optional.of(false);
+    }
+
+    JobMetrics metrics = job.getDataflowClient().projects().jobs()
+        .getMetrics(job.getProjectId(), job.getJobId()).execute();
+
+    if (metrics == null || metrics.getMetrics() == null) {
+      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+    } else {
+      int successes = 0;
+      int failures = 0;
+      for (MetricUpdate metric : metrics.getMetrics()) {
+        if (metric.getName() == null || metric.getName().getContext() == null
+            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+          // Don't double count using the non-tentative version of the metric.
+          continue;
+        }
+        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+          successes += ((BigDecimal) metric.getScalar()).intValue();
+        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+          failures += ((BigDecimal) metric.getScalar()).intValue();
+        }
+      }
+
+      if (failures > 0) {
+        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures
out of "
+            + "{} expected assertions.", job.getJobId(), successes, failures,
+            expectedNumberOfAssertions);
+        return Optional.of(false);
+      } else if (successes >= expectedNumberOfAssertions) {
+        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures
out of "
+            + "{} expected assertions.", job.getJobId(), successes, failures,
+            expectedNumberOfAssertions);
+        return Optional.of(true);
+      }
+
+      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected
"
+          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
+    }
+
+    return Optional.<Boolean>absent();
+  }
+
+  @Override
+  public String toString() {
+    return "TestDataflowRunner#" + options.getAppName();
+  }
+
+  /**
+   * Cancels the workflow on the first error message it sees.
+   *
+   * <p>Creates an error message representing the concatenation of all error messages
seen.
+   */
+  private static class CancelWorkflowOnError implements JobMessagesHandler {
+    private final DataflowPipelineJob job;
+    private final JobMessagesHandler messageHandler;
+    private final StringBuffer errorMessage;
+    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler)
{
+      this.job = job;
+      this.messageHandler = messageHandler;
+      this.errorMessage = new StringBuffer();
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      messageHandler.process(messages);
+      for (JobMessage message : messages) {
+        if (message.getMessageImportance() != null
+            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(), message.getMessageText());
+          errorMessage.append(message.getMessageText());
+        }
+      }
+      if (errorMessage.length() > 0) {
+        LOG.info("Cancelling Dataflow job {}", job.getJobId());
+        try {
+          job.cancel();
+        } catch (Exception ignore) {
+          // The TestDataflowRunner will thrown an AssertionError with the job failure
+          // messages.
+        }
+      }
+    }
+
+    private String getErrorMessage() {
+      return errorMessage.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 4067f08..cd99643 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -85,7 +85,7 @@ import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-/** Tests for {@link TestDataflowPipelineRunner}. */
+/** Tests for {@link TestDataflowRunner}. */
 @RunWith(JUnit4.class)
 public class TestDataflowRunnerTest {
   @Rule public ExpectedException expectedException = ExpectedException.none();
@@ -110,14 +110,14 @@ public class TestDataflowRunnerTest {
     options.setTempRoot("gs://test");
     options.setGcpCredential(new TestCredential());
     options.setDataflowClient(service);
-    options.setRunner(TestDataflowPipelineRunner.class);
+    options.setRunner(TestDataflowRunner.class);
     options.setPathValidatorClass(NoopPathValidator.class);
   }
 
   @Test
   public void testToString() {
-    assertEquals("TestDataflowPipelineRunner#TestAppName",
-        new TestDataflowPipelineRunner(options).toString());
+    assertEquals("TestDataflowRunner#TestAppName",
+        new TestDataflowRunner(options).toString());
   }
 
   @Test
@@ -135,7 +135,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     when(request.execute()).thenReturn(
         generateMockMetricResponse(true /* success */, true /* tentative */));
     assertEquals(mockJob, runner.run(p, mockRunner));
@@ -156,7 +156,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -197,7 +197,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(
         generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -228,7 +228,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(
         generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     runner.run(p, mockRunner);
   }
 
@@ -250,7 +250,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(
         generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -269,7 +269,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     when(request.execute()).thenReturn(
         generateMockMetricResponse(true /* success */, true /* tentative */));
     doReturn(State.DONE).when(job).getState();
@@ -284,7 +284,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     when(request.execute()).thenReturn(
         generateMockMetricResponse(false /* success */, true /* tentative */));
     doReturn(State.DONE).when(job).getState();
@@ -299,7 +299,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     when(request.execute()).thenReturn(
         generateMockMetricResponse(true /* success */, false /* tentative */));
     doReturn(State.RUNNING).when(job).getState();
@@ -335,7 +335,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     when(request.execute()).thenReturn(
         generateMockMetricResponse(true /* success */, false /* tentative */));
     doReturn(State.FAILED).when(job).getState();
@@ -373,7 +373,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(
         generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -401,7 +401,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
@@ -426,7 +426,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
@@ -453,7 +453,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
@@ -478,7 +478,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
@@ -505,7 +505,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 
@@ -537,7 +537,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 


Mime
View raw message