zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mina...@apache.org
Subject zeppelin git commit: [ZEPPELIN-1585] Testcase for PySparkInterpreter.
Date Thu, 03 Nov 2016 12:09:25 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 55de3fba7 -> d4375977d


[ZEPPELIN-1585] Testcase for PySparkInterpreter.

### What is this PR for?

This PR adds PySparkInterpreter testcase.
### What type of PR is it?

Improvement
### What is the Jira issue?
### Questions:
- Does the licenses files need update? no
- Is there breaking changes for older versions? no
- Does this needs documentation? no

Author: astroshim <hsshim@nflabs.com>

Closes #1564 from astroshim/feat/pySparkInterpreterTest and squashes the following commits:

ea8a081 [astroshim] fix to use full package name.
5f33389 [astroshim] fix to use full package name.
9650077 [astroshim] apply spark.submit.pyFiles
5b39384 [astroshim] ignore testcase of spark 1.1 version
3c7abf0 [astroshim] Merge branch 'master' into feat/pySparkInterpreterTest
1cf3fae [astroshim] Merge branch 'master' into feat/pySparkInterpreterTest
265a82b [astroshim] change scope
51aa813 [astroshim] add PySparkInterpreter testcase.
3fe0c7e [astroshim] Merge branch 'master' into feat/pySparkInterpreterTest
499aa6b [astroshim] add PySparkInterpreter testcase


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d4375977
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d4375977
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d4375977

Branch: refs/heads/master
Commit: d4375977ddf938f40d0f6af24c35c898f7e96985
Parents: 55de3fb
Author: astroshim <hsshim@nflabs.com>
Authored: Thu Nov 3 15:37:17 2016 +0900
Committer: Mina Lee <minalee@apache.org>
Committed: Thu Nov 3 21:09:17 2016 +0900

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java      |  15 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |   1 +
 .../zeppelin/spark/PySparkInterpreterTest.java  | 147 +++++++++++++++++++
 3 files changed, 159 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d4375977/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index c0b131c..420ebd5 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -49,9 +49,9 @@ import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.spark.dep.SparkDependencyContext;
 import org.slf4j.Logger;
@@ -165,6 +165,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
+  private Map setupPySparkEnv() throws IOException{
+    Map env = EnvironmentUtils.getProcEnvironment();
+    if (!env.containsKey("PYTHONPATH")) {
+      SparkConf conf = getSparkConf();
+      env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":"));
+    }
+    return env;
+  }
+
   private void createGatewayServerAndStartScript() {
     // create python script
     createPythonScript();
@@ -196,10 +205,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     executor.setStreamHandler(streamHandler);
     executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
 
-
     try {
-      Map env = EnvironmentUtils.getProcEnvironment();
-
+      Map env = setupPySparkEnv();
       executor.execute(cmd, env, this);
       pythonscriptRunning = true;
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d4375977/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 6aa2f28..53bf30b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -504,6 +504,7 @@ public class SparkInterpreter extends Interpreter {
         conf.set("spark.files", conf.get("spark.yarn.dist.files"));
       }
       conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
+      conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris));
     }
 
     // Distributes needed libraries to workers

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d4375977/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
new file mode 100644
index 0000000..401e0fd
--- /dev/null
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PySparkInterpreterTest {
+  public static SparkInterpreter sparkInterpreter;
+  public static PySparkInterpreter pySparkInterpreter;
+  public static InterpreterGroup intpGroup;
+  private File tmpDir;
+  public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
+  private InterpreterContext context;
+
+  public static Properties getPySparkTestProperties() {
+    Properties p = new Properties();
+    p.setProperty("master", "local[*]");
+    p.setProperty("spark.app.name", "Zeppelin Test");
+    p.setProperty("zeppelin.spark.useHiveContext", "true");
+    p.setProperty("zeppelin.spark.maxResult", "1000");
+    p.setProperty("zeppelin.spark.importImplicit", "true");
+    p.setProperty("zeppelin.pyspark.python", "python");
+    return p;
+  }
+
+  /**
+   * Get spark version number as a numerical value.
+   * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+   */
+  public static int getSparkVersionNumber() {
+    if (sparkInterpreter == null) {
+      return 0;
+    }
+
+    String[] split = sparkInterpreter.getSparkContext().version().split("\\.");
+    int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
+    return version;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
+    System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
+    tmpDir.mkdirs();
+
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    if (sparkInterpreter == null) {
+      sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
+      intpGroup.get("note").add(sparkInterpreter);
+      sparkInterpreter.setInterpreterGroup(intpGroup);
+      sparkInterpreter.open();
+    }
+
+    if (pySparkInterpreter == null) {
+      pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties());
+      intpGroup.get("note").add(pySparkInterpreter);
+      pySparkInterpreter.setInterpreterGroup(intpGroup);
+      pySparkInterpreter.open();
+    }
+
+    context = new InterpreterContext("note", "id", "title", "text",
+      new AuthenticationInfo(),
+      new HashMap<String, Object>(),
+      new GUI(),
+      new AngularObjectRegistry(intpGroup.getId(), null),
+      new LocalResourcePool("id"),
+      new LinkedList<InterpreterContextRunner>(),
+      new InterpreterOutput(new InterpreterOutputListener() {
+        @Override
+        public void onAppend(InterpreterOutput out, byte[] line) {
+
+        }
+
+        @Override
+        public void onUpdate(InterpreterOutput out, byte[] output) {
+
+        }
+      }));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    delete(tmpDir);
+  }
+
+  private void delete(File file) {
+    if (file.isFile()) file.delete();
+    else if (file.isDirectory()) {
+      File[] files = file.listFiles();
+      if (files != null && files.length > 0) {
+        for (File f : files) {
+          delete(f);
+        }
+      }
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testBasicIntp() {
+    if (getSparkVersionNumber() > 11) {
+      assertEquals(InterpreterResult.Code.SUCCESS,
+        pySparkInterpreter.interpret("a = 1\n", context).code());
+    }
+  }
+
+}


Mime
View raw message