beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieme...@apache.org
Subject [1/3] beam git commit: Test waitUntilFinish(Duration) in the DirectRunner
Date Tue, 04 Apr 2017 07:55:29 GMT
Repository: beam
Updated Branches:
  refs/heads/master de36e8398 -> cc8e0b9df


Test waitUntilFinish(Duration) in the DirectRunner

Ensures that the call to "waitUntilFinish(Duration)" terminates before
the Pipeline completes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b4541a18
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b4541a18
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b4541a18

Branch: refs/heads/master
Commit: b4541a18cf447fed2b2150a99be1d892e1f8e358
Parents: de36e83
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Apr 3 10:12:58 2017 -0700
Committer: Ismaël Mejía <iemejia@apache.org>
Committed: Tue Apr 4 09:45:19 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 27 ++++++++++++++++++++
 1 file changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b4541a18/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index e601fcf..f1c0eb2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -277,6 +278,32 @@ public class DirectRunnerTest implements Serializable {
   }
 
   @Test
+  public void waitUntilFinishTimeout() throws Exception {
+    DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    options.setRunner(DirectRunner.class);
+    Pipeline p = Pipeline.create(options);
+    p.apply(Create.of(1L))
+        .apply(
+            ParDo.of(
+                new DoFn<Long, Long>() {
+                  @ProcessElement
+                  public void hang(ProcessContext context) throws InterruptedException {
+                    // Hangs "forever"
+                    Thread.sleep(Long.MAX_VALUE);
+                  }
+                }));
+    PipelineResult result = p.run();
+    // The pipeline should never complete;
+    assertThat(result.getState(), is(State.RUNNING));
+    // Must time out, otherwise this test will never complete
+    result.waitUntilFinish(Duration.millis(1L));
+    assertThat(result.getState(), is(State.RUNNING));
+
+    result.cancel();
+  }
+
+  @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
       @ProcessElement


Mime
View raw message