zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [1/2] zeppelin git commit: ZEPPELIN-3569. Improvement of FlinkInterpreter
Date Thu, 09 Aug 2018 03:12:07 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 8e5013c6a -> 1c5b38a9a


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/FlinkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/FlinkIntegrationTest.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/FlinkIntegrationTest.java
new file mode 100644
index 0000000..f3a154e
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/FlinkIntegrationTest.java
@@ -0,0 +1,116 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest {
+  private static Logger LOGGER = LoggerFactory.getLogger(SparkIntegrationTest.class);
+
+  private static MiniHadoopCluster hadoopCluster;
+  private static MiniZeppelin zeppelin;
+  private static InterpreterFactory interpreterFactory;
+  private static InterpreterSettingManager interpreterSettingManager;
+
+  private String flinkVersion;
+  private String flinkHome;
+
+  public FlinkIntegrationTest(String flinkVersion) {
+    LOGGER.info("Testing FlinkVersion: " + flinkVersion);
+    this.flinkVersion = flinkVersion;
+    this.flinkHome = SparkDownloadUtils.downloadFlink(flinkVersion);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {"1.5.1"},
+        {"1.5.2"}
+    });
+
+  }
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    hadoopCluster = new MiniHadoopCluster();
+    hadoopCluster.start();
+
+    zeppelin = new MiniZeppelin();
+    zeppelin.start();
+    interpreterFactory = zeppelin.getInterpreterFactory();
+    interpreterSettingManager = zeppelin.getInterpreterSettingManager();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (zeppelin != null) {
+      zeppelin.stop();
+    }
+    if (hadoopCluster != null) {
+      hadoopCluster.stop();
+    }
+  }
+
+  private void testInterpreterBasics() throws IOException, InterpreterException {
+    // test FlinkInterpreter
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    Interpreter flinkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "flink");
+
+    InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+    InterpreterResult interpreterResult = flinkInterpreter.interpret("1+1", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertTrue(interpreterResult.msg.get(0).getData().contains("2"));
+
+  }
+
+  @Test
+  public void testLocalMode() throws IOException, YarnException, InterpreterException {
+    InterpreterSetting flinkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("flink");
+    flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
+    flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+
+    testInterpreterBasics();
+
+    // no yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(0, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  // TODO(zjffdu) enable it when make yarn integration test work
+  //  @Test
+  public void testYarnMode() throws IOException, InterpreterException, YarnException {
+    InterpreterSetting flinkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("flink");
+    flinkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+    flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
+    flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+    flinkInterpreterSetting.setProperty("flink.execution.mode", "YARN");
+    testInterpreterBasics();
+
+    // 1 yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
deleted file mode 100644
index 90bf627..0000000
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package org.apache.zeppelin.interpreter;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SparkDownloadUtils {
-  private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class);
-
-  private static String downloadFolder = System.getProperty("user.home") + "/.cache/spark";
-
-  static {
-    try {
-      FileUtils.forceMkdir(new File(downloadFolder));
-    } catch (IOException e) {
-      throw new RuntimeException("Fail to create downloadFolder: " + downloadFolder, e);
-    }
-  }
-
-
-  public static String downloadSpark(String version) {
-    File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6");
-    if (targetSparkHomeFolder.exists()) {
-      LOGGER.info("Skip to download spark as it is already downloaded.");
-      return targetSparkHomeFolder.getAbsolutePath();
-    }
-    // Try mirrors a few times until one succeeds
-    boolean downloaded = false;
-    for (int i = 0; i < 3; i++) {
-      try {
-        String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"));
-        File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
-        String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version
+ "-bin-hadoop2.6.tgz";
-        runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
-        runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C",
downloadFolder});
-        downloaded = true;
-        break;
-      } catch (Exception e) {
-        LOGGER.warn("Failed to download Spark", e);
-      }
-    }
-    // fallback to use apache archive
-    // https://archive.apache.org/dist/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.6.tgz
-    if (!downloaded) {
-      File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
-      String downloadURL =
-          "https://archive.apache.org/dist/spark/spark-"
-              + version
-              + "/spark-"
-              + version
-              + "-bin-hadoop2.6.tgz";
-      try {
-        runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
-        runShellCommand(
-            new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
-      } catch (Exception e) {
-        throw new RuntimeException("Fail to download spark " + version, e);
-      }
-    }
-    return targetSparkHomeFolder.getAbsolutePath();
-  }
-
-  private static void runShellCommand(String[] commands) throws IOException, InterruptedException
{
-    LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
-    Process process = Runtime.getRuntime().exec(commands);
-    StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
-    StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
-    errorGobbler.start();
-    outputGobbler.start();
-    if (process.waitFor() != 0) {
-      throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, "
"));
-    }
-    LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
-  }
-
-  private static class StreamGobbler extends Thread {
-    InputStream is;
-
-    // reads everything from is until empty.
-    StreamGobbler(InputStream is) {
-      this.is = is;
-    }
-
-    public void run() {
-      try {
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-        String line = null;
-        long startTime = System.currentTimeMillis();
-        while ( (line = br.readLine()) != null) {
-          // logging per 5 seconds
-          if ((System.currentTimeMillis() - startTime) > 5000) {
-            LOGGER.info(line);
-            startTime = System.currentTimeMillis();
-          }
-        }
-      } catch (IOException ioe) {
-        ioe.printStackTrace();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/test/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/flink-conf.yaml b/zeppelin-zengine/src/test/resources/flink-conf.yaml
new file mode 100644
index 0000000..e69de29


Mime
View raw message