zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k..@apache.org
Subject zeppelin git commit: [ZEPPELIN-2675] Distributing Jars when using an external flink cluster
Date Mon, 10 Jul 2017 01:18:09 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 96d78ee57 -> 1ff8c1475


[ZEPPELIN-2675] Distributing Jars when using an external flink cluster

### What is this PR for?

This PR intends to make Flink interpreter able to distribute external dependencies on cluster
when they are loaded by Web UI.

The code simply collects jar paths downloaded by DependecyResolver and add them to FlinkILoop
constructor.

Loading external dependencies in Flink interpreter only work with MiniCluster and Flink version
lower than 1.3.0. Only Spark is able to distribute jars at the moment.

### What type of PR is it?

Improvement

### What is the Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-2675

### How should this be tested?

1. [Download Flink](https://flink.apache.org/downloads.html) and run local cluster with following
command:

```./bin/start-cluster.sh```

2. go to the interpreter page and in dependencies section add artifact:

```joda-time:joda-time:jar:2.9.9```

3.  change `local` with `localhost` in interpreter page

4. run below code in paragraph

```
%flink
import org.joda.time.{DateTime, DateTimeZone}

val text = benv.fromElements("To be or not to be")
text
  .flatMap { _.toLowerCase.split(" ") }
  .map(word => (word, new DateTime(System.currentTimeMillis(), DateTimeZone.UTC) ))
  .print()
```

## Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: marcotagliabue <marco.tagliabue@icloud.com>

Closes #2429 from marcotagliabue/feature/distribute-jars-flink and squashes the following
commits:

49bd1f59 [marcotagliabue] Enable distribution of jars in Flink cluster


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1ff8c147
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1ff8c147
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1ff8c147

Branch: refs/heads/master
Commit: 1ff8c1475eebf67fbc2a6a9b0a09c3ac273bc4e5
Parents: 96d78ee
Author: marcotagliabue <marco.tagliabue@icloud.com>
Authored: Thu Jun 22 14:21:28 2017 +0200
Committer: 1ambda <1amb4a@gmail.com>
Committed: Mon Jul 10 10:18:03 2017 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/flink/FlinkInterpreter.java | 26 +++++++++++++++++---
 1 file changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ff8c147/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 91ffb9c..710eace 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -91,11 +91,29 @@ public class FlinkInterpreter extends Interpreter {
       startFlinkMiniCluster();
     }
 
+    String[] externalJars = new String[0];
+    String localRepo = getProperty("zeppelin.interpreter.localRepo");
+    if (localRepo != null) {
+      File localRepoDir = new File(localRepo);
+      if (localRepoDir.exists()) {
+        File[] files = localRepoDir.listFiles();
+        if (files != null) {
+          externalJars = new String[files.length];
+          for (int i = 0; i < files.length; i++) {
+            if (externalJars.length > 0) {
+              externalJars[i] = files[i].getAbsolutePath();
+            }
+          }
+        }
+      }
+    }
+
     flinkIloop = new FlinkILoop(getHost(),
-                                getPort(),
-                                flinkConf,
-                                (BufferedReader) null,
-                                new PrintWriter(out));
+        getPort(),
+        flinkConf,
+        new Some<>(externalJars),
+        (BufferedReader) null,
+        new PrintWriter(out));
 
     flinkIloop.settings_$eq(createSettings());
     flinkIloop.createInterpreter();


Mime
View raw message