beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-272][flink] remove dependency on Dataflow Runner
Date Thu, 12 May 2016 08:59:30 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6ec9e9680 -> 123674f4b


[BEAM-272][flink] remove dependency on Dataflow Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50edd231
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50edd231
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50edd231

Branch: refs/heads/master
Commit: 50edd2314e7c7b97b75d2e6759c5857f4f67a662
Parents: acb0406
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed May 11 11:57:44 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu May 12 10:52:40 2016 +0200

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    | 10 -------
 .../runners/flink/FlinkPipelineOptions.java     | 30 ++++++++++++++++++--
 .../beam/runners/flink/FlinkPipelineRunner.java |  4 +--
 3 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index a1d5370..8958bdd 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -110,16 +110,6 @@
       </exclusions>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>google-cloud-dataflow-java-runner</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
     <!-- Test scoped -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index fd86bc9..c40473e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -18,14 +18,18 @@
 package org.apache.beam.runners.flink;
 
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.util.List;
 
@@ -50,9 +54,9 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
   /**
    * The job name is used to identify jobs running on a Flink cluster.
    */
-  @Description("Dataflow job name, to uniquely identify active jobs. "
+  @Description("Flink job name, to uniquely identify active jobs. "
       + "Defaults to using the ApplicationName-UserName-Date.")
-  @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+  @Default.InstanceFactory(JobNameFactory.class)
   String getJobName();
   void setJobName(String value);
 
@@ -91,4 +95,24 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
   @Default.Long(-1L)
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
+
+
+  class JobNameFactory implements DefaultValueFactory<String> {
+    private static final DateTimeFormatter FORMATTER =
+        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+    @Override
+    public String create(PipelineOptions options) {
+      String appName = options.as(ApplicationNameOptions.class).getAppName();
+      String normalizedAppName = appName == null || appName.length() == 0 ? "FlinkRunner"
+          : appName.toLowerCase()
+          .replaceAll("[^a-z0-9]", "0")
+          .replaceAll("^[^a-z]", "a");
+      String userName = System.getProperty("user.name", "");
+      String normalizedUserName = userName.toLowerCase()
+          .replaceAll("[^a-z0-9]", "0");
+      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index a389d7a..3edf6f3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.flink;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -46,7 +45,6 @@ import java.util.Map;
  * pipeline by first translating them to a Flink Plan and then executing them either locally
  * or on a Flink cluster, depending on the configuration.
  * <p>
- * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}.
  */
 public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
@@ -80,7 +78,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult>
{
 
     if (flinkOptions.getFilesToStage() == null) {
       flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
-          DataflowPipelineRunner.class.getClassLoader()));
+          FlinkPipelineRunner.class.getClassLoader()));
       LOG.info("PipelineOptions.filesToStage was not specified. "
               + "Defaulting to files from the classpath: will stage {} files. "
               + "Enable logging at DEBUG level to see which files will be staged.",


Mime
View raw message