beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [14/23] beam git commit: Batch executions should block without timeout.
Date Tue, 28 Feb 2017 22:35:21 GMT
Batch executions should block without timeout.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3867dcd7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3867dcd7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3867dcd7

Branch: refs/heads/master
Commit: 3867dcd793adcb030faa4713624542210b86b68d
Parents: f47e0eb
Author: Sela <ansela@paypal.com>
Authored: Mon Feb 20 20:40:18 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Wed Mar 1 00:18:05 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/TestSparkRunner.java  | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3867dcd7/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 985f75d..d2b5186 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -116,7 +116,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult>
{
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
     SparkPipelineOptions sparkOptions = pipeline.getOptions().as(SparkPipelineOptions.class);
-    long timeout = sparkOptions.getForcedTimeout();
     SparkPipelineResult result = null;
     try {
       // clear state of Aggregators, Metrics and Watermarks.
@@ -126,14 +125,12 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult>
{
 
       TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
       LOG.info("About to run test pipeline " + sparkOptions.getJobName());
-      result = delegate.run(pipeline);
-      result.waitUntilFinish(Duration.millis(timeout));
-
-      assertThat(result, testPipelineOptions.getOnCreateMatcher());
-      assertThat(result, testPipelineOptions.getOnSuccessMatcher());
 
       // if the pipeline was executed in streaming mode, validate aggregators.
       if (isForceStreaming) {
+        result = delegate.run(pipeline);
+        long timeout = sparkOptions.getForcedTimeout();
+        result.waitUntilFinish(Duration.millis(timeout));
         // validate assertion succeeded (at least once).
         int successAssertions = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
         assertThat(
@@ -154,6 +151,13 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult>
{
                 "Successfully asserted pipeline %s with %d successful assertions.",
                 sparkOptions.getJobName(),
                 successAssertions));
+      } else {
+        // for batch test pipelines, run and block until done.
+        result = delegate.run(pipeline);
+        result.waitUntilFinish();
+        // assert via matchers.
+        assertThat(result, testPipelineOptions.getOnCreateMatcher());
+        assertThat(result, testPipelineOptions.getOnSuccessMatcher());
       }
     } finally {
       try {


Mime
View raw message