zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject zeppelin git commit: [ZEPPELIN-1746] Flaky test: PySparkInterpreterTest
Date Tue, 14 Feb 2017 09:19:02 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 9b55f5860 -> 632947856


[ZEPPELIN-1746] Flaky test: PySparkInterpreterTest

### What is this PR for?
PySparkInterpreter (and PySparkInterpreterMatplotlibTest, SparkInterpreterTest, SparkSqlInterpreterTest)
test does not terminate SparkInterpreter after test end.

In the beginning, keeping SparkInterpreter alive was designed to prevent create/destroy SparkInterpreter
multiple times in various tests, because it takes some time.

However, that is somehow misbehaving and getting problems like

1. starting another SparkInterpreter before one SparkInterpreter terminates
2. Does not finish PySparkInterpreter process explicitly cause py4j client somehow creates
a lot of connection attempt, that consumes all capacity of systems port open, and make next
SparkInterpreter start fail.

This PR terminates SparkInterpreter after test class finishes, to prevent above behavior.

### What type of PR is it?
Hot Fix

### Todos
* [x] - Terminate SparkInterpreter after each test class

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1746

### How should this be tested?
CI becomes green

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <moon@apache.org>

Closes #2017 from Leemoonsoo/ZEPPELIN-1746 and squashes the following commits:

e6ba4e1 [Lee moon soo] Try start and terminate spark context after each test class


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/63294785
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/63294785
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/63294785

Branch: refs/heads/master
Commit: 632947856f9195b9e74984facd584f0a41646c7c
Parents: 9b55f58
Author: Lee moon soo <moon@apache.org>
Authored: Tue Feb 14 02:48:59 2017 +0900
Committer: Lee moon soo <moon@apache.org>
Committed: Tue Feb 14 18:18:56 2017 +0900

----------------------------------------------------------------------
 .../spark/PySparkInterpreterMatplotlibTest.java | 48 +++++++-------
 .../zeppelin/spark/PySparkInterpreterTest.java  | 51 ++++++++-------
 .../zeppelin/spark/SparkInterpreterTest.java    | 55 ++++++++--------
 .../zeppelin/spark/SparkSqlInterpreterTest.java | 67 ++++++++++----------
 4 files changed, 110 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/63294785/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index 17b2128..7fe8b5e 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -39,14 +39,14 @@ import static org.junit.Assert.*;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class PySparkInterpreterMatplotlibTest {
 
-  @Rule
-  public TemporaryFolder tmpDir = new TemporaryFolder();
-
-  public static SparkInterpreter sparkInterpreter;
-  public static PySparkInterpreter pyspark;
-  public static InterpreterGroup intpGroup;
-  public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
-  private InterpreterContext context;
+  @ClassRule
+  public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+  static SparkInterpreter sparkInterpreter;
+  static PySparkInterpreter pyspark;
+  static InterpreterGroup intpGroup;
+  static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
+  static InterpreterContext context;
   
   public static class AltPySparkInterpreter extends PySparkInterpreter {
     /**
@@ -80,7 +80,7 @@ public class PySparkInterpreterMatplotlibTest {
     }
   }
 
-  private Properties getPySparkTestProperties() throws IOException {
+  private static Properties getPySparkTestProperties() throws IOException {
     Properties p = new Properties();
     p.setProperty("master", "local[*]");
     p.setProperty("spark.app.name", "Zeppelin Test");
@@ -106,24 +106,20 @@ public class PySparkInterpreterMatplotlibTest {
     return version;
   }
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUp() throws Exception {
     intpGroup = new InterpreterGroup();
     intpGroup.put("note", new LinkedList<Interpreter>());
 
-    if (sparkInterpreter == null) {
-      sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
-      intpGroup.get("note").add(sparkInterpreter);
-      sparkInterpreter.setInterpreterGroup(intpGroup);
-      sparkInterpreter.open();
-    }
+    sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
+    intpGroup.get("note").add(sparkInterpreter);
+    sparkInterpreter.setInterpreterGroup(intpGroup);
+    sparkInterpreter.open();
 
-    if (pyspark == null) {
-      pyspark = new AltPySparkInterpreter(getPySparkTestProperties());
-      intpGroup.get("note").add(pyspark);
-      pyspark.setInterpreterGroup(intpGroup);
-      pyspark.open();
-    }
+    pyspark = new AltPySparkInterpreter(getPySparkTestProperties());
+    intpGroup.get("note").add(pyspark);
+    pyspark.setInterpreterGroup(intpGroup);
+    pyspark.open();
 
     context = new InterpreterContext("note", "id", null, "title", "text",
       new AuthenticationInfo(),
@@ -135,6 +131,12 @@ public class PySparkInterpreterMatplotlibTest {
       new InterpreterOutput(null));
   }
 
+  @AfterClass
+  public static void tearDown() {
+    pyspark.close();
+    sparkInterpreter.close();
+  }
+
   @Test
   public void dependenciesAreInstalled() {
     // matplotlib

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/63294785/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 35b876d..60e40d7 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -23,10 +23,7 @@ import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.Before;
-import org.junit.FixMethodOrder;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runners.MethodSorters;
 import org.slf4j.Logger;
@@ -42,16 +39,16 @@ import static org.junit.Assert.*;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class PySparkInterpreterTest {
 
-  @Rule
-  public TemporaryFolder tmpDir = new TemporaryFolder();
+  @ClassRule
+  public static TemporaryFolder tmpDir = new TemporaryFolder();
 
-  public static SparkInterpreter sparkInterpreter;
-  public static PySparkInterpreter pySparkInterpreter;
-  public static InterpreterGroup intpGroup;
-  public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
-  private InterpreterContext context;
+  static SparkInterpreter sparkInterpreter;
+  static PySparkInterpreter pySparkInterpreter;
+  static InterpreterGroup intpGroup;
+  static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
+  static InterpreterContext context;
 
-  private Properties getPySparkTestProperties() throws IOException {
+  private static Properties getPySparkTestProperties() throws IOException {
     Properties p = new Properties();
     p.setProperty("master", "local[*]");
     p.setProperty("spark.app.name", "Zeppelin Test");
@@ -77,24 +74,20 @@ public class PySparkInterpreterTest {
     return version;
   }
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUp() throws Exception {
     intpGroup = new InterpreterGroup();
     intpGroup.put("note", new LinkedList<Interpreter>());
 
-    if (sparkInterpreter == null) {
-      sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
-      intpGroup.get("note").add(sparkInterpreter);
-      sparkInterpreter.setInterpreterGroup(intpGroup);
-      sparkInterpreter.open();
-    }
+    sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
+    intpGroup.get("note").add(sparkInterpreter);
+    sparkInterpreter.setInterpreterGroup(intpGroup);
+    sparkInterpreter.open();
 
-    if (pySparkInterpreter == null) {
-      pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties());
-      intpGroup.get("note").add(pySparkInterpreter);
-      pySparkInterpreter.setInterpreterGroup(intpGroup);
-      pySparkInterpreter.open();
-    }
+    pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties());
+    intpGroup.get("note").add(pySparkInterpreter);
+    pySparkInterpreter.setInterpreterGroup(intpGroup);
+    pySparkInterpreter.open();
 
     context = new InterpreterContext("note", "id", null, "title", "text",
       new AuthenticationInfo(),
@@ -106,6 +99,12 @@ public class PySparkInterpreterTest {
       new InterpreterOutput(null));
   }
 
+  @AfterClass
+  public static void tearDown() {
+    pySparkInterpreter.close();
+    sparkInterpreter.close();
+  }
+
   @Test
   public void testBasicIntp() {
     if (getSparkVersionNumber() > 11) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/63294785/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 8c78b66..ba5feea 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -37,10 +37,7 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.junit.Before;
-import org.junit.FixMethodOrder;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runners.MethodSorters;
 import org.slf4j.Logger;
@@ -49,21 +46,21 @@ import org.slf4j.LoggerFactory;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class SparkInterpreterTest {
 
-  @Rule
-  public TemporaryFolder tmpDir = new TemporaryFolder();
+  @ClassRule
+  public static TemporaryFolder tmpDir = new TemporaryFolder();
 
-  public static SparkInterpreter repl;
-  public static InterpreterGroup intpGroup;
-  private InterpreterContext context;
-  public static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class);
-  private  static Map<String, Map<String, String>> paraIdToInfosMap =
+  static SparkInterpreter repl;
+  static InterpreterGroup intpGroup;
+  static InterpreterContext context;
+  static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class);
+  static Map<String, Map<String, String>> paraIdToInfosMap =
       new HashMap<>();
 
   /**
    * Get spark version number as a numerical value.
    * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
    */
-  public static int getSparkVersionNumber() {
+  public static int getSparkVersionNumber(SparkInterpreter repl) {
     if (repl == null) {
       return 0;
     }
@@ -85,16 +82,14 @@ public class SparkInterpreterTest {
     return p;
   }
 
-  @Before
-  public void setUp() throws Exception {
-    if (repl == null) {
-      intpGroup = new InterpreterGroup();
-      intpGroup.put("note", new LinkedList<Interpreter>());
-      repl = new SparkInterpreter(getSparkTestProperties(tmpDir));
-      repl.setInterpreterGroup(intpGroup);
-      intpGroup.get("note").add(repl);
-      repl.open();
-    }
+  @BeforeClass
+  public static void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    repl = new SparkInterpreter(getSparkTestProperties(tmpDir));
+    repl.setInterpreterGroup(intpGroup);
+    intpGroup.get("note").add(repl);
+    repl.open();
 
     final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper()
{
 
@@ -132,6 +127,11 @@ public class SparkInterpreterTest {
     repl.interpret("sc", context);
   }
 
+  @AfterClass
+  public static void tearDown() {
+    repl.close();
+  }
+
   @Test
   public void testBasicIntp() {
     assertEquals(InterpreterResult.Code.SUCCESS,
@@ -180,7 +180,7 @@ public class SparkInterpreterTest {
 
   @Test
   public void testCreateDataFrame() {
-    if (getSparkVersionNumber() >= 13) {
+    if (getSparkVersionNumber(repl) >= 13) {
       repl.interpret("case class Person(name:String, age:Int)\n", context);
       repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\",
51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
       repl.interpret("people.toDF.count", context);
@@ -196,7 +196,7 @@ public class SparkInterpreterTest {
     String code = "";
     repl.interpret("case class Person(name:String, age:Int)\n", context);
     repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\",
51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
-    if (getSparkVersionNumber() < 13) {
+    if (getSparkVersionNumber(repl) < 13) {
       repl.interpret("people.registerTempTable(\"people\")", context);
       code = "z.show(sqlc.sql(\"select * from people\"))";
     } else {
@@ -212,7 +212,8 @@ public class SparkInterpreterTest {
     assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
 
 
-    if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple
SparkContext in the same jvm by default.
+    if (getSparkVersionNumber(repl) <= 11) { // spark 1.2 or later does not allow create
multiple
+      // SparkContext in the same jvm by default.
       // create new interpreter
       SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
       repl2.setInterpreterGroup(intpGroup);
@@ -265,7 +266,7 @@ public class SparkInterpreterTest {
 
   @Test
   public void testEnableImplicitImport() throws IOException {
-    if (getSparkVersionNumber() >= 13) {
+    if (getSparkVersionNumber(repl) >= 13) {
       // Set option of importing implicits to "true", and initialize new Spark repl
       Properties p = getSparkTestProperties(tmpDir);
       p.setProperty("zeppelin.spark.importImplicit", "true");
@@ -282,7 +283,7 @@ public class SparkInterpreterTest {
 
   @Test
   public void testDisableImplicitImport() throws IOException {
-    if (getSparkVersionNumber() >= 13) {
+    if (getSparkVersionNumber(repl) >= 13) {
       // Set option of importing implicits to "false", and initialize new Spark repl
       // this test should return error status when creating DataFrame from sequence
       Properties p = getSparkTestProperties(tmpDir);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/63294785/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 89cd712..5984645 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -27,9 +27,7 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.assertEquals;
@@ -37,45 +35,38 @@ import static org.junit.Assert.assertTrue;
 
 public class SparkSqlInterpreterTest {
 
-  @Rule
-  public TemporaryFolder tmpDir = new TemporaryFolder();
+  @ClassRule
+  public static TemporaryFolder tmpDir = new TemporaryFolder();
 
-  private SparkSqlInterpreter sql;
-  private SparkInterpreter repl;
-  private InterpreterContext context;
-  private InterpreterGroup intpGroup;
+  static SparkSqlInterpreter sql;
+  static SparkInterpreter repl;
+  static InterpreterContext context;
+  static InterpreterGroup intpGroup;
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUp() throws Exception {
     Properties p = new Properties();
     p.putAll(SparkInterpreterTest.getSparkTestProperties(tmpDir));
     p.setProperty("zeppelin.spark.maxResult", "1000");
     p.setProperty("zeppelin.spark.concurrentSQL", "false");
     p.setProperty("zeppelin.spark.sql.stacktrace", "false");
 
-    if (repl == null) {
-
-      if (SparkInterpreterTest.repl == null) {
-        repl = new SparkInterpreter(p);
-        intpGroup = new InterpreterGroup();
-        repl.setInterpreterGroup(intpGroup);
-        repl.open();
-        SparkInterpreterTest.repl = repl;
-        SparkInterpreterTest.intpGroup = intpGroup;
-      } else {
-        repl = SparkInterpreterTest.repl;
-        intpGroup = SparkInterpreterTest.intpGroup;
-      }
-
-      sql = new SparkSqlInterpreter(p);
-
-      intpGroup = new InterpreterGroup();
-      intpGroup.put("note", new LinkedList<Interpreter>());
-      intpGroup.get("note").add(repl);
-      intpGroup.get("note").add(sql);
-      sql.setInterpreterGroup(intpGroup);
-      sql.open();
-    }
+    repl = new SparkInterpreter(p);
+    intpGroup = new InterpreterGroup();
+    repl.setInterpreterGroup(intpGroup);
+    repl.open();
+    SparkInterpreterTest.repl = repl;
+    SparkInterpreterTest.intpGroup = intpGroup;
+
+    sql = new SparkSqlInterpreter(p);
+
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(repl);
+    intpGroup.get("note").add(sql);
+    sql.setInterpreterGroup(intpGroup);
+    sql.open();
+
     context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
         new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
@@ -83,8 +74,14 @@ public class SparkSqlInterpreterTest {
         new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
   }
 
+  @AfterClass
+  public static void tearDown() {
+    sql.close();
+    repl.close();
+  }
+
   boolean isDataFrameSupported() {
-    return SparkInterpreterTest.getSparkVersionNumber() >= 13;
+    return SparkInterpreterTest.getSparkVersionNumber(repl) >= 13;
   }
 
   @Test


Mime
View raw message