From commits-return-4057-archive-asf-public=cust-asf.ponee.io@zeppelin.apache.org Fri Feb 2 07:00:49 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id CE9BA180608 for ; Fri, 2 Feb 2018 07:00:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BE29C160C5F; Fri, 2 Feb 2018 06:00:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9C23F160C44 for ; Fri, 2 Feb 2018 07:00:47 +0100 (CET) Received: (qmail 9618 invoked by uid 500); 2 Feb 2018 06:00:46 -0000 Mailing-List: contact commits-help@zeppelin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.apache.org Delivered-To: mailing list commits@zeppelin.apache.org Received: (qmail 9609 invoked by uid 99); 2 Feb 2018 06:00:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Feb 2018 06:00:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AB06DFB31; Fri, 2 Feb 2018 06:00:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@zeppelin.apache.org Date: Fri, 02 Feb 2018 06:00:46 -0000 Message-Id: <3b0edb364e3e49e29d27970d74414592@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter Repository: zeppelin Updated Branches: refs/heads/master 66644126a -> d762b5288 http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java deleted file mode 100644 index d2b01ce..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - - -import com.google.common.io.Files; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterOutputListener; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResultMessage; -import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.python.IPythonInterpreterTest; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CopyOnWriteArrayList; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class IPySparkInterpreterTest { - - private IPySparkInterpreter iPySparkInterpreter; - private InterpreterGroup intpGroup; - - @Before - public void setup() throws InterpreterException { - Properties p = new Properties(); - p.setProperty("spark.master", "local[4]"); - p.setProperty("master", "local[4]"); - p.setProperty("spark.submit.deployMode", "client"); - p.setProperty("spark.app.name", "Zeppelin Test"); - p.setProperty("zeppelin.spark.useHiveContext", "true"); - p.setProperty("zeppelin.spark.maxResult", "1000"); - p.setProperty("zeppelin.spark.importImplicit", "true"); - p.setProperty("zeppelin.pyspark.python", "python"); - p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath()); - - intpGroup = new InterpreterGroup(); - intpGroup.put("session_1", new LinkedList()); - - SparkInterpreter sparkInterpreter = new SparkInterpreter(p); - intpGroup.get("session_1").add(sparkInterpreter); - sparkInterpreter.setInterpreterGroup(intpGroup); - sparkInterpreter.open(); - - iPySparkInterpreter = new IPySparkInterpreter(p); - intpGroup.get("session_1").add(iPySparkInterpreter); - iPySparkInterpreter.setInterpreterGroup(intpGroup); - iPySparkInterpreter.open(); - } - - - @After - public void tearDown() { - if (iPySparkInterpreter != null) { - iPySparkInterpreter.close(); - } - } - - @Test - public void testBasics() throws InterruptedException, IOException, InterpreterException { - // all the ipython test should pass too. - IPythonInterpreterTest.testInterpreter(iPySparkInterpreter); - - // rdd - InterpreterContext context = getInterpreterContext(); - InterpreterResult result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context); - Thread.sleep(100); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - List interpreterResultMessages = context.out.getInterpreterResultMessages(); - assertEquals("45", interpreterResultMessages.get(0).getData()); - - context = getInterpreterContext(); - result = iPySparkInterpreter.interpret("sc.version", context); - Thread.sleep(100); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.getInterpreterResultMessages(); - // spark sql - context = getInterpreterContext(); - if (interpreterResultMessages.get(0).getData().startsWith("'1.") || - interpreterResultMessages.get(0).getData().startsWith("u'1.")) { - result = iPySparkInterpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.getInterpreterResultMessages(); - assertEquals( - "+---+---+\n" + - "| _1| _2|\n" + - "+---+---+\n" + - "| 1| a|\n" + - "| 2| b|\n" + - "+---+---+\n\n", interpreterResultMessages.get(0).getData()); - } else { - result = iPySparkInterpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.getInterpreterResultMessages(); - assertEquals( - "+---+---+\n" + - "| _1| _2|\n" + - "+---+---+\n" + - "| 1| a|\n" + - "| 2| b|\n" + - "+---+---+\n\n", interpreterResultMessages.get(0).getData()); - } - - // cancel - final InterpreterContext context2 = getInterpreterContext(); - - Thread thread = new Thread(){ - @Override - public void run() { - InterpreterResult result = iPySparkInterpreter.interpret("import time\nsc.range(1,10).foreach(lambda x: time.sleep(1))", context2); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - List interpreterResultMessages = null; - try { - interpreterResultMessages = context2.out.getInterpreterResultMessages(); - assertTrue(interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt")); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - thread.start(); - - // sleep 1 second to wait for the spark job starts - Thread.sleep(1000); - iPySparkInterpreter.cancel(context); - thread.join(); - - // completions - List completions = iPySparkInterpreter.completion("sc.ran", 6, getInterpreterContext()); - assertEquals(1, completions.size()); - assertEquals("range", completions.get(0).getValue()); - - // pyspark streaming - context = getInterpreterContext(); - result = iPySparkInterpreter.interpret( - "from pyspark.streaming import StreamingContext\n" + - "import time\n" + - "ssc = StreamingContext(sc, 1)\n" + - "rddQueue = []\n" + - "for i in range(5):\n" + - " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" + - "inputStream = ssc.queueStream(rddQueue)\n" + - "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" + - "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" + - "reducedStream.pprint()\n" + - "ssc.start()\n" + - "time.sleep(6)\n" + - "ssc.stop(stopSparkContext=False, stopGraceFully=True)", context); - Thread.sleep(1000); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.getInterpreterResultMessages(); - assertEquals(1, interpreterResultMessages.size()); - assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)")); - } - - private InterpreterContext getInterpreterContext() { - return new InterpreterContext( - "noteId", - "paragraphId", - "replName", - "paragraphTitle", - "paragraphText", - new AuthenticationInfo(), - new HashMap(), - new GUI(), - new GUI(), - null, - null, - null, - new InterpreterOutput(null)); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java deleted file mode 100644 index 2f1077d..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Type; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.*; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.MethodSorters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -import static org.junit.Assert.*; - -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class PySparkInterpreterMatplotlibTest { - - @ClassRule - public static TemporaryFolder tmpDir = new TemporaryFolder(); - - static SparkInterpreter sparkInterpreter; - static PySparkInterpreter pyspark; - static InterpreterGroup intpGroup; - static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class); - static InterpreterContext context; - - public static class AltPySparkInterpreter extends PySparkInterpreter { - /** - * Since pyspark output is sent to an outputstream rather than - * being directly provided by interpret(), this subclass is created to - * override interpret() to append the result from the outputStream - * for the sake of convenience in testing. - */ - public AltPySparkInterpreter(Properties property) { - super(property); - } - - /** - * This code is mainly copied from RemoteInterpreterServer.java which - * normally handles this in real use cases. - */ - @Override - public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { - context.out.clear(); - InterpreterResult result = super.interpret(st, context); - List resultMessages = null; - try { - context.out.flush(); - resultMessages = context.out.toInterpreterResultMessage(); - } catch (IOException e) { - e.printStackTrace(); - } - resultMessages.addAll(result.message()); - - return new InterpreterResult(result.code(), resultMessages); - } - } - - private static Properties getPySparkTestProperties() throws IOException { - Properties p = new Properties(); - p.setProperty("master", "local[*]"); - p.setProperty("spark.app.name", "Zeppelin Test"); - p.setProperty("zeppelin.spark.useHiveContext", "true"); - p.setProperty("zeppelin.spark.maxResult", "1000"); - p.setProperty("zeppelin.spark.importImplicit", "true"); - p.setProperty("zeppelin.pyspark.python", "python"); - p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty("zeppelin.pyspark.useIPython", "false"); - return p; - } - - /** - * Get spark version number as a numerical value. - * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... - */ - public static int getSparkVersionNumber() { - if (sparkInterpreter == null) { - return 0; - } - - String[] split = sparkInterpreter.getSparkContext().version().split("\\."); - int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); - return version; - } - - @BeforeClass - public static void setUp() throws Exception { - intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList()); - context = new InterpreterContext("note", "id", null, "title", "text", - new AuthenticationInfo(), - new HashMap(), - new GUI(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("id"), - new LinkedList(), - new InterpreterOutput(null)); - InterpreterContext.set(context); - - sparkInterpreter = new SparkInterpreter(getPySparkTestProperties()); - intpGroup.get("note").add(sparkInterpreter); - sparkInterpreter.setInterpreterGroup(intpGroup); - sparkInterpreter.open(); - - pyspark = new AltPySparkInterpreter(getPySparkTestProperties()); - intpGroup.get("note").add(pyspark); - pyspark.setInterpreterGroup(intpGroup); - pyspark.open(); - - } - - @AfterClass - public static void tearDown() { - pyspark.close(); - sparkInterpreter.close(); - } - - @Test - public void dependenciesAreInstalled() throws InterpreterException { - // matplotlib - InterpreterResult ret = pyspark.interpret("import matplotlib", context); - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - - // inline backend - ret = pyspark.interpret("import backend_zinline", context); - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - } - - @Test - public void showPlot() throws InterpreterException { - // Simple plot test - InterpreterResult ret; - ret = pyspark.interpret("import matplotlib.pyplot as plt", context); - ret = pyspark.interpret("plt.close()", context); - ret = pyspark.interpret("z.configure_mpl(interactive=False)", context); - ret = pyspark.interpret("plt.plot([1, 2, 3])", context); - ret = pyspark.interpret("plt.show()", context); - - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().toString(), Type.HTML, ret.message().get(0).getType()); - assertTrue(ret.message().get(0).getData().contains("data:image/png;base64")); - assertTrue(ret.message().get(0).getData().contains("
")); - } - - @Test - // Test for when configuration is set to auto-close figures after show(). - public void testClose() throws InterpreterException { - InterpreterResult ret; - InterpreterResult ret1; - InterpreterResult ret2; - ret = pyspark.interpret("import matplotlib.pyplot as plt", context); - ret = pyspark.interpret("plt.close()", context); - ret = pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context); - ret = pyspark.interpret("plt.plot([1, 2, 3])", context); - ret1 = pyspark.interpret("plt.show()", context); - - // Second call to show() should print nothing, and Type should be TEXT. - // This is because when close=True, there should be no living instances - // of FigureManager, causing show() to return before setting the output - // type to HTML. - ret = pyspark.interpret("plt.show()", context); - assertEquals(0, ret.message().size()); - - // Now test that new plot is drawn. It should be identical to the - // previous one. - ret = pyspark.interpret("plt.plot([1, 2, 3])", context); - ret2 = pyspark.interpret("plt.show()", context); - assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType()); - assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData()); - } - - @Test - // Test for when configuration is set to not auto-close figures after show(). - public void testNoClose() throws InterpreterException { - InterpreterResult ret; - InterpreterResult ret1; - InterpreterResult ret2; - ret = pyspark.interpret("import matplotlib.pyplot as plt", context); - ret = pyspark.interpret("plt.close()", context); - ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=False)", context); - ret = pyspark.interpret("plt.plot([1, 2, 3])", context); - ret1 = pyspark.interpret("plt.show()", context); - - // Second call to show() should print nothing, and Type should be HTML. - // This is because when close=False, there should be living instances - // of FigureManager, causing show() to set the output - // type to HTML even though the figure is inactive. - ret = pyspark.interpret("plt.show()", context); - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - - // Now test that plot can be reshown if it is updated. It should be - // different from the previous one because it will plot the same line - // again but in a different color. - ret = pyspark.interpret("plt.plot([1, 2, 3])", context); - ret2 = pyspark.interpret("plt.show()", context); - assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData()); - } - - @Test - // Test angular mode - public void testAngular() throws InterpreterException { - InterpreterResult ret; - ret = pyspark.interpret("import matplotlib.pyplot as plt", context); - ret = pyspark.interpret("plt.close()", context); - ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context); - ret = pyspark.interpret("plt.plot([1, 2, 3])", context); - ret = pyspark.interpret("plt.show()", context); - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(0).getType()); - - // Check if the figure data is in the Angular Object Registry - AngularObjectRegistry registry = context.getAngularObjectRegistry(); - String figureData = registry.getAll("note", null).get(0).toString(); - assertTrue(figureData.contains("data:image/png;base64")); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java deleted file mode 100644 index 0db2bb1..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.*; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.MethodSorters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.junit.Assert.*; - -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class PySparkInterpreterTest { - - @ClassRule - public static TemporaryFolder tmpDir = new TemporaryFolder(); - - static SparkInterpreter sparkInterpreter; - static PySparkInterpreter pySparkInterpreter; - static InterpreterGroup intpGroup; - static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class); - static InterpreterContext context; - - private static Properties getPySparkTestProperties() throws IOException { - Properties p = new Properties(); - p.setProperty("master", "local[*]"); - p.setProperty("spark.app.name", "Zeppelin Test"); - p.setProperty("zeppelin.spark.useHiveContext", "true"); - p.setProperty("zeppelin.spark.maxResult", "1000"); - p.setProperty("zeppelin.spark.importImplicit", "true"); - p.setProperty("zeppelin.pyspark.python", "python"); - p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty("zeppelin.pyspark.useIPython", "false"); - return p; - } - - /** - * Get spark version number as a numerical value. - * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... - */ - public static int getSparkVersionNumber() { - if (sparkInterpreter == null) { - return 0; - } - - String[] split = sparkInterpreter.getSparkContext().version().split("\\."); - int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); - return version; - } - - @BeforeClass - public static void setUp() throws Exception { - intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList()); - - context = new InterpreterContext("note", "id", null, "title", "text", - new AuthenticationInfo(), - new HashMap(), - new GUI(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("id"), - new LinkedList(), - new InterpreterOutput(null)); - InterpreterContext.set(context); - - sparkInterpreter = new SparkInterpreter(getPySparkTestProperties()); - intpGroup.get("note").add(sparkInterpreter); - sparkInterpreter.setInterpreterGroup(intpGroup); - sparkInterpreter.open(); - - pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties()); - intpGroup.get("note").add(pySparkInterpreter); - pySparkInterpreter.setInterpreterGroup(intpGroup); - pySparkInterpreter.open(); - - - } - - @AfterClass - public static void tearDown() { - pySparkInterpreter.close(); - sparkInterpreter.close(); - } - - @Test - public void testBasicIntp() throws InterpreterException { - if (getSparkVersionNumber() > 11) { - assertEquals(InterpreterResult.Code.SUCCESS, - pySparkInterpreter.interpret("a = 1\n", context).code()); - } - - InterpreterResult result = pySparkInterpreter.interpret( - "from pyspark.streaming import StreamingContext\n" + - "import time\n" + - "ssc = StreamingContext(sc, 1)\n" + - "rddQueue = []\n" + - "for i in range(5):\n" + - " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" + - "inputStream = ssc.queueStream(rddQueue)\n" + - "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" + - "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" + - "reducedStream.pprint()\n" + - "ssc.start()\n" + - "time.sleep(6)\n" + - "ssc.stop(stopSparkContext=False, stopGraceFully=True)", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - - @Test - public void testCompletion() throws InterpreterException { - if (getSparkVersionNumber() > 11) { - List completions = pySparkInterpreter.completion("sc.", "sc.".length(), null); - assertTrue(completions.size() > 0); - } - } - - @Test - public void testRedefinitionZeppelinContext() throws InterpreterException { - if (getSparkVersionNumber() > 11) { - String redefinitionCode = "z = 1\n"; - String restoreCode = "z = __zeppelin__\n"; - String validCode = "z.input(\"test\")\n"; - - assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code()); - assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(redefinitionCode, context).code()); - assertEquals(InterpreterResult.Code.ERROR, pySparkInterpreter.interpret(validCode, context).code()); - assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(restoreCode, context).code()); - assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code()); - } - } - - private class infinityPythonJob implements Runnable { - @Override - public void run() { - String code = "import time\nwhile True:\n time.sleep(1)" ; - InterpreterResult ret = null; - try { - ret = pySparkInterpreter.interpret(code, context); - } catch (InterpreterException e) { - e.printStackTrace(); - } - assertNotNull(ret); - Pattern expectedMessage = Pattern.compile("KeyboardInterrupt"); - Matcher m = expectedMessage.matcher(ret.message().toString()); - assertTrue(m.find()); - } - } - - @Test - public void testCancelIntp() throws InterruptedException, InterpreterException { - if (getSparkVersionNumber() > 11) { - assertEquals(InterpreterResult.Code.SUCCESS, - pySparkInterpreter.interpret("a = 1\n", context).code()); - - Thread t = new Thread(new infinityPythonJob()); - t.start(); - Thread.sleep(5000); - pySparkInterpreter.cancel(context); - assertTrue(t.isAlive()); - t.join(2000); - assertFalse(t.isAlive()); - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java deleted file mode 100644 index e4f15f4..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.resource.WellKnownResourceName; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.junit.*; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.MethodSorters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class SparkInterpreterTest { - - @ClassRule - public static TemporaryFolder tmpDir = new TemporaryFolder(); - - static SparkInterpreter repl; - static InterpreterGroup intpGroup; - static InterpreterContext context; - static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class); - static Map> paraIdToInfosMap = - new HashMap<>(); - - /** - * Get spark version number as a numerical value. - * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... - */ - public static int getSparkVersionNumber(SparkInterpreter repl) { - if (repl == null) { - return 0; - } - - String[] split = repl.getSparkContext().version().split("\\."); - int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); - return version; - } - - public static Properties getSparkTestProperties(TemporaryFolder tmpDir) throws IOException { - Properties p = new Properties(); - p.setProperty("master", "local[*]"); - p.setProperty("spark.app.name", "Zeppelin Test"); - p.setProperty("zeppelin.spark.useHiveContext", "true"); - p.setProperty("zeppelin.spark.maxResult", "1000"); - p.setProperty("zeppelin.spark.importImplicit", "true"); - p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty("zeppelin.spark.property_1", "value_1"); - return p; - } - - @BeforeClass - public static void setUp() throws Exception { - intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList()); - repl = new SparkInterpreter(getSparkTestProperties(tmpDir)); - repl.setInterpreterGroup(intpGroup); - intpGroup.get("note").add(repl); - repl.open(); - - final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() { - - @Override - public void onParaInfosReceived(String noteId, String paragraphId, - Map infos) { - if (infos != null) { - paraIdToInfosMap.put(paragraphId, infos); - } - } - - @Override - public void onMetaInfosReceived(Map infos) { - } - }; - context = new InterpreterContext("note", "id", null, "title", "text", - new AuthenticationInfo(), - new HashMap(), - new GUI(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("id"), - new LinkedList(), - new InterpreterOutput(null)) { - - @Override - public RemoteEventClientWrapper getClient() { - return remoteEventClientWrapper; - } - }; - // The first para interpretdr will set the Eventclient wrapper - //SparkInterpreter.interpret(String, InterpreterContext) -> - //SparkInterpreter.populateSparkWebUrl(InterpreterContext) -> - //ZeppelinContext.setEventClient(RemoteEventClientWrapper) - //running a dummy to ensure that we dont have any race conditions among tests - repl.interpret("sc", context); - } - - @AfterClass - public static void tearDown() { - repl.close(); - } - - @Test - public void testBasicIntp() { - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret("val a = 1\nval b = 2", context).code()); - - // when interpret incomplete expression - InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); - assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); - assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error - // message - - /* - * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b")); - * repl.interpret("val ver = sc.version"); - * assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n", - * repl.interpret("println(\"HELLO\")").message()); - */ - } - - @Test - public void testNonStandardSparkProperties() throws IOException { - // throw NoSuchElementException if no such property is found - InterpreterResult result = repl.interpret("sc.getConf.get(\"property_1\")", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - - @Test - public void testNextLineInvocation() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code()); - } - - @Test - public void testNextLineComments() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); - } - - @Test - public void testNextLineCompanionObject() { - String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}"; - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code()); - } - - @Test - public void testEndWithComment() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); - } - - @Test - public void testListener() { - SparkContext sc = repl.getSparkContext(); - assertNotNull(SparkInterpreter.setupListeners(sc)); - } - - @Test - public void testCreateDataFrame() { - if (getSparkVersionNumber(repl) >= 13) { - repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); - repl.interpret("people.toDF.count", context); - assertEquals(new Long(4), context.getResourcePool().get( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ZeppelinReplResult.toString()).get()); - } - } - - @Test - public void testZShow() { - String code = ""; - repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); - if (getSparkVersionNumber(repl) < 13) { - repl.interpret("people.registerTempTable(\"people\")", context); - code = "z.show(sqlc.sql(\"select * from people\"))"; - } else { - code = "z.show(people.toDF)"; - } - assertEquals(Code.SUCCESS, repl.interpret(code, context).code()); - } - - @Test - public void testSparkSql() throws IOException, InterpreterException { - repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); - assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code()); - - - if (getSparkVersionNumber(repl) <= 11) { // spark 1.2 or later does not allow create multiple - // SparkContext in the same jvm by default. - // create new interpreter - SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir)); - repl2.setInterpreterGroup(intpGroup); - intpGroup.get("note").add(repl2); - repl2.open(); - - repl2.interpret("case class Man(name:String, age:Int)", context); - repl2.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); - assertEquals(Code.SUCCESS, repl2.interpret("man.take(3)", context).code()); - repl2.close(); - } - } - - @Test - public void testReferencingUndefinedVal() { - InterpreterResult result = repl.interpret("def category(min: Int) = {" - + " if (0 <= value) \"error\"" + "}", context); - assertEquals(Code.ERROR, result.code()); - } - - @Test - public void emptyConfigurationVariablesOnlyForNonSparkProperties() { - Properties intpProperty = repl.getProperties(); - SparkConf sparkConf = repl.getSparkContext().getConf(); - for (Object oKey : intpProperty.keySet()) { - String key = (String) oKey; - String value = (String) intpProperty.get(key); - LOGGER.debug(String.format("[%s]: [%s]", key, value)); - if (key.startsWith("spark.") && value.isEmpty()) { - assertTrue(String.format("configuration starting from 'spark.' should not be empty. [%s]", key), !sparkConf.contains(key) || !sparkConf.get(key).isEmpty()); - } - } - } - - @Test - public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException { - // create another SparkInterpreter - SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir)); - repl2.setInterpreterGroup(intpGroup); - intpGroup.get("note").add(repl2); - repl2.open(); - - assertEquals(Code.SUCCESS, - repl.interpret("print(sc.parallelize(1 to 10).count())", context).code()); - assertEquals(Code.SUCCESS, - repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code()); - - repl2.close(); - } - - @Test - public void testEnableImplicitImport() throws IOException, InterpreterException { - if (getSparkVersionNumber(repl) >= 13) { - // Set option of importing implicits to "true", and initialize new Spark repl - Properties p = getSparkTestProperties(tmpDir); - p.setProperty("zeppelin.spark.importImplicit", "true"); - SparkInterpreter repl2 = new SparkInterpreter(p); - repl2.setInterpreterGroup(intpGroup); - intpGroup.get("note").add(repl2); - - repl2.open(); - String ddl = "val df = Seq((1, true), (2, false)).toDF(\"num\", \"bool\")"; - assertEquals(Code.SUCCESS, repl2.interpret(ddl, context).code()); - repl2.close(); - } - } - - @Test - public void testDisableImplicitImport() throws IOException, InterpreterException { - if (getSparkVersionNumber(repl) >= 13) { - // Set option of importing implicits to "false", and initialize new Spark repl - // this test should return error status when creating DataFrame from sequence - Properties p = getSparkTestProperties(tmpDir); - p.setProperty("zeppelin.spark.importImplicit", "false"); - SparkInterpreter repl2 = new SparkInterpreter(p); - repl2.setInterpreterGroup(intpGroup); - intpGroup.get("note").add(repl2); - - repl2.open(); - String ddl = "val df = Seq((1, true), (2, false)).toDF(\"num\", \"bool\")"; - assertEquals(Code.ERROR, repl2.interpret(ddl, context).code()); - repl2.close(); - } - } - - @Test - public void testCompletion() { - List completions = repl.completion("sc.", "sc.".length(), null); - assertTrue(completions.size() > 0); - } - - @Test - public void testMultilineCompletion() { - String buf = "val x = 1\nsc."; - List completions = repl.completion(buf, buf.length(), null); - assertTrue(completions.size() > 0); - } - - @Test - public void testMultilineCompletionNewVar() { - Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10()); - Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7()); - String buf = "val x = sc\nx."; - List completions = repl.completion(buf, buf.length(), null); - assertTrue(completions.size() > 0); - } - - @Test - public void testParagraphUrls() { - String paraId = "test_para_job_url"; - InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text", - new AuthenticationInfo(), - new HashMap(), - new GUI(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("id"), - new LinkedList(), - new InterpreterOutput(null)); - repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx); - Map paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId()); - String jobUrl = null; - if (paraInfos != null) { - jobUrl = paraInfos.get("jobUrl"); - } - String sparkUIUrl = repl.getSparkUIUrl(); - assertNotNull(jobUrl); - assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job/?id=")); - - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java deleted file mode 100644 index d97e57c..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Properties; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Type; -import org.junit.*; -import org.junit.rules.TemporaryFolder; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class SparkSqlInterpreterTest { - - @ClassRule - public static TemporaryFolder tmpDir = new TemporaryFolder(); - - static SparkSqlInterpreter sql; - static SparkInterpreter repl; - static InterpreterContext context; - static InterpreterGroup intpGroup; - - @BeforeClass - public static void setUp() throws Exception { - Properties p = new Properties(); - p.putAll(SparkInterpreterTest.getSparkTestProperties(tmpDir)); - p.setProperty("zeppelin.spark.maxResult", "10"); - p.setProperty("zeppelin.spark.concurrentSQL", "false"); - p.setProperty("zeppelin.spark.sql.stacktrace", "false"); - - repl = new SparkInterpreter(p); - intpGroup = new InterpreterGroup(); - repl.setInterpreterGroup(intpGroup); - repl.open(); - SparkInterpreterTest.repl = repl; - SparkInterpreterTest.intpGroup = intpGroup; - - sql = new SparkSqlInterpreter(p); - - intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList()); - intpGroup.get("note").add(repl); - intpGroup.get("note").add(sql); - sql.setInterpreterGroup(intpGroup); - sql.open(); - - context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), - new HashMap(), new GUI(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("id"), - new LinkedList(), new InterpreterOutput(null)); - } - - @AfterClass - public static void tearDown() { - sql.close(); - repl.close(); - } - - boolean isDataFrameSupported() { - return SparkInterpreterTest.getSparkVersionNumber(repl) >= 13; - } - - @Test - public void test() throws InterpreterException { - repl.interpret("case class Test(name:String, age:Int)", context); - repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); - if (isDataFrameSupported()) { - repl.interpret("test.toDF.registerTempTable(\"test\")", context); - } else { - repl.interpret("test.registerTempTable(\"test\")", context); - } - - InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(Type.TABLE, ret.message().get(0).getType()); - assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData()); - - ret = sql.interpret("select wrong syntax", context); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); - - assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code()); - } - - @Test - public void testStruct() throws InterpreterException { - repl.interpret("case class Person(name:String, age:Int)", context); - repl.interpret("case class People(group:String, person:Person)", context); - repl.interpret( - "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))", - context); - if (isDataFrameSupported()) { - repl.interpret("gr.toDF.registerTempTable(\"gr\")", context); - } else { - repl.interpret("gr.registerTempTable(\"gr\")", context); - } - - InterpreterResult ret = sql.interpret("select * from gr", context); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - } - - @Test - public void test_null_value_in_row() throws InterpreterException { - repl.interpret("import org.apache.spark.sql._", context); - if (isDataFrameSupported()) { - repl.interpret( - "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}", - context); - } - repl.interpret( - "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}", - context); - repl.interpret( - "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", - context); - repl.interpret( - "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", - context); - repl.interpret( - "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", - context); - if (isDataFrameSupported()) { - repl.interpret("val people = z.sqlContext.createDataFrame(raw, schema)", - context); - repl.interpret("people.toDF.registerTempTable(\"people\")", context); - } else { - repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", - context); - repl.interpret("people.registerTempTable(\"people\")", context); - } - - InterpreterResult ret = sql.interpret( - "select name, age from people where name = 'gates'", context); - System.err.println("RET=" + ret.message()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(Type.TABLE, ret.message().get(0).getType()); - assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData()); - } - - @Test - public void testMaxResults() throws InterpreterException { - repl.interpret("case class P(age:Int)", context); - repl.interpret( - "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))", - context); - if (isDataFrameSupported()) { - repl.interpret("gr.toDF.registerTempTable(\"gr\")", context); - } else { - repl.interpret("gr.registerTempTable(\"gr\")", context); - } - - InterpreterResult ret = sql.interpret("select * from gr", context); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertTrue(ret.message().get(1).getData().contains("alert-warning")); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java deleted file mode 100644 index 3dc8f4e..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.spark; - -import static org.junit.Assert.*; - -import org.junit.Test; - -public class SparkVersionTest { - - @Test - public void testUnknownSparkVersion() { - assertEquals(99999, SparkVersion.fromVersionString("DEV-10.10").toNumber()); - } - - @Test - public void testUnsupportedVersion() { - assertTrue(SparkVersion.fromVersionString("9.9.9").isUnsupportedVersion()); - assertFalse(SparkVersion.fromVersionString("1.5.9").isUnsupportedVersion()); - assertTrue(SparkVersion.fromVersionString("0.9.0").isUnsupportedVersion()); - assertTrue(SparkVersion.UNSUPPORTED_FUTURE_VERSION.isUnsupportedVersion()); - // should support spark2 version of HDP 2.5 - assertFalse(SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245").isUnsupportedVersion()); - } - - @Test - public void testSparkVersion() { - // test equals - assertEquals(SparkVersion.SPARK_1_2_0, SparkVersion.fromVersionString("1.2.0")); - assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT")); - assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT")); - // test spark2 version of HDP 2.5 - assertEquals(SparkVersion.SPARK_2_0_0, SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245")); - - // test newer than - assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0)); - assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_3_0)); - assertTrue(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_1_0)); - - assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_2_0)); - assertFalse(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_3_0)); - assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_1_0)); - - // test older than - assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_2_0)); - assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_1_0)); - assertTrue(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_3_0)); - - assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_2_0)); - assertFalse(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_1_0)); - assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0)); - - // conversion - assertEquals(10200, SparkVersion.SPARK_1_2_0.toNumber()); - assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java deleted file mode 100644 index b226a00..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark.dep; - -import static org.junit.Assert.assertEquals; - -import org.junit.Test; - -public class SparkDependencyResolverTest { - - @Test - public void testInferScalaVersion() { - String [] version = scala.util.Properties.versionNumberString().split("[.]"); - String scalaVersion = version[0] + "." + version[1]; - - assertEquals("groupId:artifactId:version", - SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version")); - assertEquals("groupId:artifactId_" + scalaVersion + ":version", - SparkDependencyResolver.inferScalaVersion("groupId::artifactId:version")); - assertEquals("groupId:artifactId:version::test", - SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version::test")); - assertEquals("*", - SparkDependencyResolver.inferScalaVersion("*")); - assertEquals("groupId:*", - SparkDependencyResolver.inferScalaVersion("groupId:*")); - assertEquals("groupId:artifactId*", - SparkDependencyResolver.inferScalaVersion("groupId:artifactId*")); - assertEquals("groupId:artifactId_" + scalaVersion, - SparkDependencyResolver.inferScalaVersion("groupId::artifactId")); - assertEquals("groupId:artifactId_" + scalaVersion + "*", - SparkDependencyResolver.inferScalaVersion("groupId::artifactId*")); - assertEquals("groupId:artifactId_" + scalaVersion + ":*", - SparkDependencyResolver.inferScalaVersion("groupId::artifactId:*")); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/spark/src/test/resources/log4j.properties b/spark/src/test/resources/log4j.properties deleted file mode 100644 index 3ee61ab..0000000 --- a/spark/src/test/resources/log4j.properties +++ /dev/null @@ -1,49 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n -#log4j.appender.stdout.layout.ConversionPattern= -#%5p [%t] (%F:%L) - %m%n -#%-4r [%t] %-5p %c %x - %m%n -# - -# Root logger option -log4j.rootLogger=INFO, stdout - -#mute some noisy guys -log4j.logger.org.apache.hadoop.mapred=WARN -log4j.logger.org.apache.hadoop.hive.ql=WARN -log4j.logger.org.apache.hadoop.hive.metastore=WARN -log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN -log4j.logger.org.apache.zeppelin.scheduler=WARN - -log4j.logger.org.quartz=WARN -log4j.logger.DataNucleus=WARN -log4j.logger.DataNucleus.MetaData=ERROR -log4j.logger.DataNucleus.Datastore=ERROR - -# Log all JDBC parameters -log4j.logger.org.hibernate.type=ALL - -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.spark=DEBUG -log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG -log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala deleted file mode 100644 index 2638f17..0000000 --- a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.spark.utils - -import java.io.ByteArrayOutputStream - -import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest._ -import org.scalatest.{BeforeAndAfter} - -case class Person(login : String, name: String, age: Int) - -class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAfterEach with Matchers { - var sc: SparkContext = null - var testTuples:List[(String, String, Int)] = null - var testPersons:List[Person] = null - var testRDDTuples: RDD[(String,String,Int)] = null - var testRDDPersons: RDD[Person] = null - var stream: ByteArrayOutputStream = null - - before { - val sparkConf: SparkConf = new SparkConf(true) - .setAppName("test-DisplayFunctions") - .setMaster("local") - sc = new SparkContext(sparkConf) - testTuples = List(("jdoe", "John DOE", 32), ("hsue", "Helen SUE", 27), ("rsmith", "Richard SMITH", 45)) - testRDDTuples = sc.parallelize(testTuples) - testPersons = List(Person("jdoe", "John DOE", 32), Person("hsue", "Helen SUE", 27), Person("rsmith", "Richard SMITH", 45)) - testRDDPersons = sc.parallelize(testPersons) - } - - override def beforeEach() { - stream = new java.io.ByteArrayOutputStream() - super.beforeEach() // To be stackable, must call super.beforeEach - } - - - "DisplayFunctions" should "generate correct column headers for tuples" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "generate correct column headers for case class" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[Person](testRDDPersons).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "truncate exceeding column headers for tuples" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age","xxx","yyy") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in { - implicit val sparkMaxResult = new SparkMaxResult(100) - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") - } - - stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayUtils" should "restricts RDD to sparkMaxresult with implicit limit" in { - - implicit val sparkMaxResult = new SparkMaxResult(2) - - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") - } - - stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n") - } - - "DisplayUtils" should "restricts RDD to sparkMaxresult with explicit limit" in { - - implicit val sparkMaxResult = new SparkMaxResult(2) - - Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display(1,"Login") - } - - stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + - "jdoe\tJohn DOE\t32\n") - } - - "DisplayFunctions" should "display traversable of tuples" in { - - Console.withOut(stream) { - new DisplayTraversableFunctions[(String,String,Int)](testTuples).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayFunctions" should "display traversable of case class" in { - - Console.withOut(stream) { - new DisplayTraversableFunctions[Person](testPersons).display("Login","Name","Age") - } - - stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + - "jdoe\tJohn DOE\t32\n" + - "hsue\tHelen SUE\t27\n" + - "rsmith\tRichard SMITH\t45\n") - } - - "DisplayUtils" should "display HTML" in { - DisplayUtils.html() should be ("%html ") - DisplayUtils.html("test") should be ("%html test") - } - - "DisplayUtils" should "display img" in { - DisplayUtils.img("http://www.google.com") should be ("") - DisplayUtils.img64() should be ("%img ") - DisplayUtils.img64("abcde") should be ("%img abcde") - } - - override def afterEach() { - try super.afterEach() // To be stackable, must call super.afterEach - stream = null - } - - after { - sc.stop() - } - - -} - - http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/testing/install_external_dependencies.sh ---------------------------------------------------------------------- diff --git a/testing/install_external_dependencies.sh b/testing/install_external_dependencies.sh index e34296e..d6c0736 100755 --- a/testing/install_external_dependencies.sh +++ b/testing/install_external_dependencies.sh @@ -44,6 +44,6 @@ if [[ -n "$PYTHON" ]] ; then conda update -q conda conda info -a conda config --add channels conda-forge - conda install -q matplotlib pandasql ipython=5.4.1 jupyter_client ipykernel matplotlib bokeh=0.12.6 - pip install -q grpcio ggplot + conda install -q matplotlib pandasql ipython=5.4.1 jupyter_client ipykernel matplotlib bokeh=0.12.10 + pip install -q grpcio ggplot bkzep==0.4.0 fi http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-display/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-display/pom.xml b/zeppelin-display/pom.xml index c6edd95..79a08a6 100644 --- a/zeppelin-display/pom.xml +++ b/zeppelin-display/pom.xml @@ -27,7 +27,7 @@ org.apache.zeppelin - zeppelin-display_2.10 + zeppelin-display jar 0.9.0-SNAPSHOT Zeppelin: Display system apis @@ -45,18 +45,21 @@ org.scala-lang scala-library ${scala.version} + provided org.scala-lang scala-compiler ${scala.version} + provided org.scala-lang scalap ${scala.version} + provided @@ -85,13 +88,6 @@ - org.scala-lang - scala-library - ${scala.version} - provided - - - org.scalatest scalatest_${scala.binary.version} ${scalatest.version} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java index f7bb776..1804fc4 100644 --- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java +++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java @@ -184,7 +184,7 @@ public class SparkParagraphIT extends AbstractZeppelinIT { } } - @Test +// @Test public void testDep() throws Exception { try { // restart spark interpreter before running %dep http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java index 65bb06f..e38a29f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java @@ -237,6 +237,8 @@ public abstract class BaseZeppelinContext { if (isSupportedObject(o)) { interpreterContext.out.write(showData(o)); } else { + interpreterContext.out.write("ZeppelinContext doesn't support to show type: " + + o.getClass().getCanonicalName() + "\n"); interpreterContext.out.write(o.toString()); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index fca8449..37db1fc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -96,10 +96,10 @@ import java.util.concurrent.ConcurrentMap; * Entry point for Interpreter process. * Accepting thrift connections from ZeppelinServer. */ -public class RemoteInterpreterServer - extends Thread +public class RemoteInterpreterServer extends Thread implements RemoteInterpreterService.Iface, AngularObjectRegistryListener { - Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); + + private static Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); InterpreterGroup interpreterGroup; AngularObjectRegistry angularObjectRegistry; @@ -255,6 +255,9 @@ public class RemoteInterpreterServer public static void main(String[] args) throws TTransportException, InterruptedException, IOException { + Class klass = RemoteInterpreterServer.class; + URL location = klass.getResource('/' + klass.getName().replace('.', '/') + ".class"); + logger.info("URL:" + location); String callbackHost = null; int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT; String portRange = ":"; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-server/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index a73cd96..970f302 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -261,6 +261,12 @@ scalatest_${scala.binary.version} ${scalatest.version} test + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 7d4c21c..5193420 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -265,21 +265,21 @@ public abstract class AbstractTestRestApi { // set spark master and other properties sparkProperties.put("master", new InterpreterProperty("master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); + sparkProperties.put("spark.master", + new InterpreterProperty("spark.master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); sparkProperties.put("spark.cores.max", new InterpreterProperty("spark.cores.max", "2", InterpreterPropertyType.TEXTAREA.getValue())); sparkProperties.put("zeppelin.spark.useHiveContext", new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue())); - // set spark home for pyspark - sparkProperties.put("spark.home", - new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue())); sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); - + sparkProperties.put("zeppelin.spark.test", new InterpreterProperty("zeppelin.spark.test", "true", InterpreterPropertyType.TEXTAREA.getValue())); sparkIntpSetting.setProperties(sparkProperties); pySpark = true; sparkR = true; ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId()); } else { String sparkHome = getSparkHome(); + LOG.info("SPARK HOME detected " + sparkHome); if (sparkHome != null) { if (System.getenv("SPARK_MASTER") != null) { sparkProperties.put("master", @@ -288,14 +288,14 @@ public abstract class AbstractTestRestApi { sparkProperties.put("master", new InterpreterProperty("master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); } + sparkProperties.put("spark.master", + new InterpreterProperty("spark.master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); sparkProperties.put("spark.cores.max", new InterpreterProperty("spark.cores.max", "2", InterpreterPropertyType.TEXTAREA.getValue())); - // set spark home for pyspark - sparkProperties.put("spark.home", - new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue())); sparkProperties.put("zeppelin.spark.useHiveContext", new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue())); sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); + sparkProperties.put("zeppelin.spark.test", new InterpreterProperty("zeppelin.spark.test", "true", InterpreterPropertyType.TEXTAREA.getValue())); pySpark = true; sparkR = true; @@ -333,7 +333,6 @@ public abstract class AbstractTestRestApi { return sparkHome; } sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName()))); - System.out.println("SPARK HOME detected " + sparkHome); return sparkHome; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 6156755..f3a7099 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -167,8 +167,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); } - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test @@ -470,7 +470,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { p1.setText("%pyspark\n" + "from pyspark.sql import SQLContext\n" + "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" + - ".load('"+ tmpFile.getAbsolutePath() +"').count())"); + ".load('" + tmpFile.getAbsolutePath() +"').count())"); p1.setAuthenticationInfo(anonymous); note.run(p1.getId()); @@ -576,6 +576,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { @Test public void testConfInterpreter() throws IOException { + ZeppelinServer.notebook.getInterpreterSettingManager().close(); Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); Map config = p.getConfig(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index ac75360..fade4dd 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -603,7 +603,7 @@ org.apache.zeppelin - zeppelin-spark_2.10 + spark-interpreter ${project.version} test