beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-642] Support Flink Detached Mode for JOB execution
Date Thu, 22 Sep 2016 09:32:38 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master f62d04e22 -> 843275210


[BEAM-642] Support Flink Detached Mode for JOB execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc69bc48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc69bc48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc69bc48

Branch: refs/heads/master
Commit: dc69bc48b0057f45d849d3cfec848fa066ee0854
Parents: f62d04e
Author: Sumit Chawla <sumichaw@cisco.com>
Authored: Mon Sep 19 15:10:53 2016 -0700
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu Sep 22 11:30:09 2016 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  | 25 +++++++++++++-------
 1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc69bc48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index d3c65c0..137fdeb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +57,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.DetachedEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,18 +153,23 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult>
{
       throw new RuntimeException("Pipeline execution failed", e);
     }
 
-    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-
-    Map<String, Object> accumulators = result.getAllAccumulatorResults();
-    if (accumulators != null && !accumulators.isEmpty()) {
-      LOG.info("Final aggregator values:");
+    if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
+      LOG.info("Pipeline submitted in Detached mode");
+      Map<String, Object> accumulators = Collections.emptyMap();
+      return new FlinkRunnerResult(accumulators, -1L);
+    } else {
+      LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+      Map<String, Object> accumulators = result.getAllAccumulatorResults();
+      if (accumulators != null && !accumulators.isEmpty()) {
+        LOG.info("Final aggregator values:");
 
-      for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet())
{
-        LOG.info("{} : {}", entry.getKey(), entry.getValue());
+        for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet())
{
+          LOG.info("{} : {}", entry.getKey(), entry.getValue());
+        }
       }
-    }
 
-    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+      return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+    }
   }
 
   /**


Mime
View raw message