beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [06/14] incubator-beam git commit: Special casing job exec AssertionError in TestFlinkPipelineRunner
Date Fri, 20 May 2016 07:15:24 GMT
Special casing job exec AssertionError in TestFlinkPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af8e9887
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af8e9887
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af8e9887

Branch: refs/heads/master
Commit: af8e98878bbc8678e33a4c00548ccabf6cf55a17
Parents: 2d71af7
Author: Kenneth Knowles <klk@google.com>
Authored: Fri May 6 12:49:55 2016 -0700
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/TestFlinkPipelineRunner.java | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af8e9887/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
index 24883c8..139aebf 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
+import org.apache.flink.runtime.client.JobExecutionException;
+
 public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
   private FlinkPipelineRunner delegate;
@@ -55,7 +57,19 @@ public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult>
{
 
   @Override
   public FlinkRunnerResult run(Pipeline pipeline) {
-    return delegate.run(pipeline);
+    try {
+      return delegate.run(pipeline);
+    } catch (RuntimeException e) {
+      // Special case hack to pull out assertion errors from PAssert; instead there should
+      // probably be a better story along the lines of UserCodeException.
+      if (e.getCause() != null
+          && e.getCause() instanceof JobExecutionException
+          && e.getCause().getCause() instanceof AssertionError) {
+          throw (AssertionError) e.getCause().getCause();
+      } else {
+        throw e;
+      }
+    }
   }
 
   public PipelineOptions getPipelineOptions() {


Mime
View raw message