zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [zeppelin] branch master updated: ZEPPELIN-4176. Remove old spark interpreter
Date Wed, 12 Jun 2019 02:34:20 GMT
This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new f5ee329  ZEPPELIN-4176. Remove old spark interpreter
f5ee329 is described below

commit f5ee329439212363493d6f5d4f92d6c9cb46dc58
Author: Jeff Zhang <zjffdu@apache.org>
AuthorDate: Tue Jun 4 22:21:44 2019 +0800

    ZEPPELIN-4176. Remove old spark interpreter
    
    ### What is this PR for?
    
    This PR is just to remove the old spark interpreter. The old spark interpreter has several issues, and we introduce new spark interpreter implementation in 0.8. This ticket is to remove it in 0.9. Here's the issues of old spark interpreter.
    
    - Didn't use native scala shell api.
    - Dependency management is not applied for yarn cluster mode.
    - It can not support scala 2.12 due to point 1
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-4176
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjffdu@apache.org>
    
    Closes #3375 from zjffdu/ZEPPELIN-4176 and squashes the following commits:
    
    4efa61fa7 [Jeff Zhang] address comment
    0578f4aaa [Jeff Zhang] ZEPPELIN-4176. Remove old spark interpreter
---
 docs/interpreter/spark.md                          |   72 +-
 .../zeppelin/spark/AbstractSparkInterpreter.java   |   64 -
 .../org/apache/zeppelin/spark/DepInterpreter.java  |  354 ------
 .../apache/zeppelin/spark/NewSparkInterpreter.java |  259 ----
 .../apache/zeppelin/spark/OldSparkInterpreter.java | 1281 --------------------
 .../apache/zeppelin/spark/PySparkInterpreter.java  |   23 -
 .../apache/zeppelin/spark/SparkInterpreter.java    |  212 +++-
 .../zeppelin/spark/dep/SparkDependencyContext.java |  181 ---
 .../spark/dep/SparkDependencyResolver.java         |  351 ------
 .../src/main/resources/interpreter-setting.json    |   34 -
 .../apache/zeppelin/spark/DepInterpreterTest.java  |   84 --
 .../zeppelin/spark/IPySparkInterpreterTest.java    |    1 -
 .../zeppelin/spark/OldSparkInterpreterTest.java    |  316 -----
 .../zeppelin/spark/OldSparkSqlInterpreterTest.java |  192 ---
 .../zeppelin/spark/PySparkInterpreterTest.java     |    3 -
 ...erpreterTest.java => SparkInterpreterTest.java} |   56 +-
 .../zeppelin/spark/SparkRInterpreterTest.java      |    1 -
 ...reterTest.java => SparkSqlInterpreterTest.java} |    3 +-
 .../integration/ZeppelinSparkClusterTest.java      |   43 +-
 19 files changed, 159 insertions(+), 3371 deletions(-)

diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index c3c9fd7..04e0675 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -214,7 +214,7 @@ You can either specify them in `zeppelin-env.sh`, or in interpreter setting page
 in interpreter setting page means you can use multiple versions of `spark` & `hadoop` in one zeppelin instance.
 
 ### 4. New Version of SparkInterpreter
-There's one new version of SparkInterpreter with better spark support and code completion starting from Zeppelin 0.8.0. We enable it by default, but user can still use the old version of SparkInterpreter by setting `zeppelin.spark.useNew` as `false` in its interpreter setting.
+Starting from 0.9, we totally removed the old spark interpreter implementation, and make the new spark interpreter as the official spark interpreter.
 
 ## SparkContext, SQLContext, SparkSession, ZeppelinContext
 SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments.
@@ -232,14 +232,9 @@ There're 2 kinds of properties that would be passed to SparkConf
  * Non-standard spark property (prefix with `zeppelin.spark.`).  e.g. `zeppelin.spark.property_1`, `property_1` will be passed to `SparkConf`
 
 ## Dependency Management
-There are two ways to load external libraries in Spark interpreter. First is using interpreter setting menu and second is loading Spark properties.
 
-### 1. Setting Dependencies via Interpreter Setting
-Please see [Dependency Management](../usage/interpreter/dependency_management.html) for the details.
-
-### 2. Loading Spark Properties
-Once `SPARK_HOME` is set in `conf/zeppelin-env.sh`, Zeppelin uses `spark-submit` as spark interpreter runner. `spark-submit` supports two ways to load configurations.
-The first is command line options such as --master and Zeppelin can pass these options to `spark-submit` by exporting `SPARK_SUBMIT_OPTIONS` in `conf/zeppelin-env.sh`. Second is reading configuration options from `SPARK_HOME/conf/spark-defaults.conf`. Spark properties that user can set to distribute libraries are:
+For spark interpreter, you should not use Zeppelin's [Dependency Management](../usage/interpreter/dependency_management.html) for managing 
+third party dependencies, (`%spark.dep` also is not the recommended approach starting from Zeppelin 0.8). Instead you should set spark properties (`spark.jars`, `spark.files`, `spark.jars.packages`) in 2 ways.
 
 <table class="table-configuration">
   <tr>
@@ -264,15 +259,16 @@ The first is command line options such as --master and Zeppelin can pass these o
   </tr>
 </table>
 
-Here are few examples:
+### 1. Set spark properties in zeppelin side.
 
-* `SPARK_SUBMIT_OPTIONS` in `conf/zeppelin-env.sh`
+In zeppelin side, you can either set them in spark interpreter setting page or via [Generic ConfInterpreter](../usage/interpreter/overview.html).
+It is not recommended to set them in `SPARK_SUBMIT_OPTIONS`. Because it will be shared by all spark interpreters, you can not set different dependencies for different users.
 
-  ```bash
-    export SPARK_SUBMIT_OPTIONS="--packages com.databricks:spark-csv_2.10:1.2.0 --jars /path/mylib1.jar,/path/mylib2.jar --files /path/mylib1.py,/path/mylib2.zip,/path/mylib3.egg"
-  ```
+### 2. Set spark properties in spark side.
+
+In spark side, you can set them in `spark-defaults.conf`.
 
-* `SPARK_HOME/conf/spark-defaults.conf`
+e.g.
 
   ```
     spark.jars        /path/mylib1.jar,/path/mylib2.jar
@@ -280,54 +276,6 @@ Here are few examples:
     spark.files       /path/mylib1.py,/path/mylib2.egg,/path/mylib3.zip
   ```
 
-### 3. Dynamic Dependency Loading via %spark.dep interpreter
-> Note: `%spark.dep` interpreter loads libraries to `%spark` and `%spark.pyspark` but not to  `%spark.sql` interpreter. So we recommend you to use the first option instead.
-
-When your code requires external library, instead of doing download/copy/restart Zeppelin, you can easily do following jobs using `%spark.dep` interpreter.
-
- * Load libraries recursively from maven repository
- * Load libraries from local filesystem
- * Add additional maven repository
- * Automatically add libraries to SparkCluster (You can turn off)
-
-Dep interpreter leverages Scala environment. So you can write any Scala code here.
-Note that `%spark.dep` interpreter should be used before `%spark`, `%spark.pyspark`, `%spark.sql`.
-
-Here's usages.
-
-```scala
-%spark.dep
-z.reset() // clean up previously added artifact and repository
-
-// add maven repository
-z.addRepo("RepoName").url("RepoURL")
-
-// add maven snapshot repository
-z.addRepo("RepoName").url("RepoURL").snapshot()
-
-// add credentials for private maven repository
-z.addRepo("RepoName").url("RepoURL").username("username").password("password")
-
-// add artifact from filesystem
-z.load("/path/to.jar")
-
-// add artifact from maven repository, with no dependency
-z.load("groupId:artifactId:version").excludeAll()
-
-// add artifact recursively
-z.load("groupId:artifactId:version")
-
-// add artifact recursively except comma separated GroupID:ArtifactId list
-z.load("groupId:artifactId:version").exclude("groupId:artifactId,groupId:artifactId, ...")
-
-// exclude with pattern
-z.load("groupId:artifactId:version").exclude(*)
-z.load("groupId:artifactId:version").exclude("groupId:artifactId:*")
-z.load("groupId:artifactId:version").exclude("groupId:*")
-
-// local() skips adding artifact to spark clusters (skipping sc.addJar())
-z.load("groupId:artifactId:version").local()
-```
 
 ## ZeppelinContext
 Zeppelin automatically injects `ZeppelinContext` as variable `z` in your Scala/Python environment. `ZeppelinContext` provides some additional functions and utilities.
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java
deleted file mode 100644
index 91fa7de..0000000
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.AbstractInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-
-import java.util.Properties;
-
-/**
- * Abstract class for SparkInterpreter. For the purpose of co-exist of NewSparkInterpreter
- * and OldSparkInterpreter
- */
-public abstract class AbstractSparkInterpreter extends AbstractInterpreter {
-
-  private SparkInterpreter parentSparkInterpreter;
-
-  public AbstractSparkInterpreter(Properties properties) {
-    super(properties);
-  }
-
-  public abstract SparkContext getSparkContext();
-
-  public abstract SQLContext getSQLContext();
-
-  public abstract Object getSparkSession();
-
-  public abstract boolean isSparkContextInitialized();
-
-  public abstract SparkVersion getSparkVersion();
-
-  public abstract JavaSparkContext getJavaSparkContext();
-
-  public abstract String getSparkUIUrl();
-
-  public abstract boolean isUnsupportedSparkVersion();
-
-  public void setParentSparkInterpreter(SparkInterpreter parentSparkInterpreter) {
-    this.parentSparkInterpreter = parentSparkInterpreter;
-  }
-
-  public SparkInterpreter getParentSparkInterpreter() {
-    return parentSparkInterpreter;
-  }
-}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
deleted file mode 100644
index d76b09e..0000000
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.resolution.ArtifactResolutionException;
-import org.sonatype.aether.resolution.DependencyResolutionException;
-
-import scala.Console;
-import scala.None;
-import scala.Some;
-import scala.collection.convert.WrapAsJava$;
-import scala.collection.JavaConversions;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-
-/**
- * DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized.
- * It extends SparkInterpreter but does not create sparkcontext
- *
- */
-public class DepInterpreter extends Interpreter {
-  /**
-   * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
-   * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
-   */
-  private Object intp;
-  private ByteArrayOutputStream out;
-  private SparkDependencyContext depc;
-  /**
-   * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
-   */
-  private Object completer;
-  private SparkILoop interpreter;
-  static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class);
-
-  public DepInterpreter(Properties property) {
-    super(property);
-  }
-
-  public SparkDependencyContext getDependencyContext() {
-    return depc;
-  }
-
-  public static String getSystemDefault(
-      String envName,
-      String propertyName,
-      String defaultValue) {
-
-    if (envName != null && !envName.isEmpty()) {
-      String envValue = System.getenv().get(envName);
-      if (envValue != null) {
-        return envValue;
-      }
-    }
-
-    if (propertyName != null && !propertyName.isEmpty()) {
-      String propValue = System.getProperty(propertyName);
-      if (propValue != null) {
-        return propValue;
-      }
-    }
-    return defaultValue;
-  }
-
-  @Override
-  public void close() {
-    if (intp != null) {
-      Utils.invokeMethod(intp, "close");
-    }
-  }
-
-  @Override
-  public void open() {
-    out = new ByteArrayOutputStream();
-    createIMain();
-  }
-
-
-  private void createIMain() {
-    Settings settings = new Settings();
-    URL[] urls = getClassloaderUrls();
-
-    // set classpath for scala compiler
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
-    }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
-    }
-
-    pathSettings.v_$eq(classpath);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-    // set classloader for scala compiler
-    settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
-        .getContextClassLoader()));
-
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
-    interpreter.settings_$eq(settings);
-
-    interpreter.createInterpreter();
-
-
-    intp = Utils.invokeMethod(interpreter, "intp");
-
-    if (Utils.isScala2_10()) {
-      Utils.invokeMethod(intp, "setContextClassLoader");
-      Utils.invokeMethod(intp, "initializeSynchronous");
-    }
-
-    depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"),
-        getProperty("zeppelin.dep.additionalRemoteRepository"));
-    if (Utils.isScala2_10()) {
-      completer = Utils.instantiateClass(
-          "org.apache.spark.repl.SparkJLineCompletion",
-          new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
-          new Object[]{intp});
-    }
-    interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
-    Map<String, Object> binder;
-    if (Utils.isScala2_10()) {
-      binder = (Map<String, Object>) getValue("_binder");
-    } else {
-      binder = (Map<String, Object>) getLastObject();
-    }
-    binder.put("depc", depc);
-
-    interpret("@transient val z = "
-        + "_binder.get(\"depc\")"
-        + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]");
-
-  }
-
-  private Results.Result interpret(String line) {
-    return (Results.Result) Utils.invokeMethod(
-        intp,
-        "interpret",
-        new Class[] {String.class},
-        new Object[] {line});
-  }
-
-  public Object getValue(String name) {
-    Object ret = Utils.invokeMethod(
-        intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
-    if (ret instanceof None) {
-      return null;
-    } else if (ret instanceof Some) {
-      return ((Some) ret).get();
-    } else {
-      return ret;
-    }
-  }
-
-  public Object getLastObject() {
-    IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
-    Object obj = r.lineRep().call("$result",
-        JavaConversions.asScalaBuffer(new LinkedList<>()));
-    return obj;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context)
-      throws InterpreterException {
-    PrintStream printStream = new PrintStream(out);
-    Console.setOut(printStream);
-    out.reset();
-
-    SparkInterpreter sparkInterpreter =
-        getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false);
-
-    if (sparkInterpreter != null && sparkInterpreter.getDelegation().isSparkContextInitialized()) {
-      return new InterpreterResult(Code.ERROR,
-          "Must be used before SparkInterpreter (%spark) initialized\n" +
-              "Hint: put this paragraph before any Spark code and " +
-              "restart Zeppelin/Interpreter" );
-    }
-
-    scala.tools.nsc.interpreter.Results.Result ret = interpret(st);
-    Code code = getResultCode(ret);
-
-    try {
-      depc.fetch();
-    } catch (MalformedURLException | DependencyResolutionException
-        | ArtifactResolutionException e) {
-      LOGGER.error("Exception in DepInterpreter while interpret ", e);
-      return new InterpreterResult(Code.ERROR, e.toString());
-    }
-
-    if (code == Code.INCOMPLETE) {
-      return new InterpreterResult(code, "Incomplete expression");
-    } else if (code == Code.ERROR) {
-      return new InterpreterResult(code, out.toString());
-    } else {
-      return new InterpreterResult(code, out.toString());
-    }
-  }
-
-  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
-    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
-      return Code.SUCCESS;
-    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
-      return Code.INCOMPLETE;
-    } else {
-      return Code.ERROR;
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-                                                InterpreterContext interpreterContext) {
-    if (Utils.isScala2_10()) {
-      ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
-      Candidates ret = c.complete(buf, cursor);
-
-      List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
-      List<InterpreterCompletion> completions = new LinkedList<>();
-
-      for (String candidate : candidates) {
-        completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
-      }
-
-      return completions;
-    } else {
-      return new LinkedList<>();
-    }
-  }
-
-  private List<File> currentClassPath() {
-    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
-
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<>();
-    if (cl == null) {
-      return paths;
-    }
-
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    try {
-      SparkInterpreter sparkInterpreter =
-          getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false);
-      if (sparkInterpreter != null) {
-        return sparkInterpreter.getScheduler();
-      } else {
-        return null;
-      }
-    } catch (InterpreterException e) {
-      return null;
-    }
-  }
-}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
deleted file mode 100644
index 4a39cfe..0000000
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.BaseZeppelinContext;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * SparkInterpreter of Java implementation. It is just wrapper of Spark211Interpreter
- * and Spark210Interpreter.
- */
-public class NewSparkInterpreter extends AbstractSparkInterpreter {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(NewSparkInterpreter.class);
-
-  private BaseSparkScalaInterpreter innerInterpreter;
-  private Map<String, String> innerInterpreterClassMap = new HashMap<>();
-  private SparkContext sc;
-  private JavaSparkContext jsc;
-  private SQLContext sqlContext;
-  private Object sparkSession;
-
-  private SparkZeppelinContext z;
-  private SparkVersion sparkVersion;
-  private boolean enableSupportedVersionCheck;
-  private String sparkUrl;
-  private SparkShims sparkShims;
-
-  private static InterpreterHookRegistry hooks;
-
-
-  public NewSparkInterpreter(Properties properties) {
-    super(properties);
-    this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
-        properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
-    innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter");
-    innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter");
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    try {
-      String scalaVersion = extractScalaVersion();
-      LOGGER.info("Using Scala Version: " + scalaVersion);
-      SparkConf conf = new SparkConf();
-      for (Map.Entry<Object, Object> entry : getProperties().entrySet()) {
-        if (!StringUtils.isBlank(entry.getValue().toString())) {
-          conf.set(entry.getKey().toString(), entry.getValue().toString());
-        }
-        // zeppelin.spark.useHiveContext & zeppelin.spark.concurrentSQL are legacy zeppelin
-        // properties, convert them to spark properties here.
-        if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) {
-          conf.set("spark.useHiveContext", entry.getValue().toString());
-        }
-        if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL")
-            && entry.getValue().toString().equals("true")) {
-          conf.set("spark.scheduler.mode", "FAIR");
-        }
-      }
-      // use local mode for embedded spark mode when spark.master is not found
-      conf.setIfMissing("spark.master", "local");
-
-      String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
-      Class clazz = Class.forName(innerIntpClassName);
-      this.innerInterpreter = (BaseSparkScalaInterpreter)
-          clazz.getConstructor(SparkConf.class, List.class, Boolean.class)
-              .newInstance(conf, getDependencyFiles(),
-                  Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput", "true")));
-      this.innerInterpreter.open();
-
-      sc = this.innerInterpreter.sc();
-      jsc = JavaSparkContext.fromSparkContext(sc);
-      sparkVersion = SparkVersion.fromVersionString(sc.version());
-      if (enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion()) {
-        throw new Exception("This is not officially supported spark version: " + sparkVersion
-            + "\nYou can set zeppelin.spark.enableSupportedVersionCheck to false if you really" +
-            " want to try this version of spark.");
-      }
-      sqlContext = this.innerInterpreter.sqlContext();
-      sparkSession = this.innerInterpreter.sparkSession();
-      hooks = getInterpreterGroup().getInterpreterHookRegistry();
-      sparkUrl = this.innerInterpreter.sparkUrl();
-      String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
-      if (!StringUtils.isBlank(sparkUrlProp)) {
-        sparkUrl = sparkUrlProp;
-      }
-      sparkShims = SparkShims.getInstance(sc.version(), getProperties());
-      sparkShims.setupSparkListener(sc.master(), sparkUrl, InterpreterContext.get());
-
-      z = new SparkZeppelinContext(sc, sparkShims, hooks,
-          Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
-      this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z,
-          Lists.newArrayList("@transient"));
-    } catch (Exception e) {
-      LOGGER.error("Fail to open SparkInterpreter", ExceptionUtils.getStackTrace(e));
-      throw new InterpreterException("Fail to open SparkInterpreter", e);
-    }
-  }
-
-  @Override
-  public void close() {
-    LOGGER.info("Close SparkInterpreter");
-    if (innerInterpreter != null) {
-      innerInterpreter.close();
-      innerInterpreter = null;
-    }
-  }
-
-  @Override
-  public InterpreterResult internalInterpret(String st, InterpreterContext context) {
-    sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
-    // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph
-    // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool
-    sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
-
-    return innerInterpreter.interpret(st, context);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    sc.cancelJobGroup(Utils.buildJobGroupId(context));
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf,
-                                                int cursor,
-                                                InterpreterContext interpreterContext) {
-    LOGGER.debug("buf: " + buf + ", cursor:" + cursor);
-    return innerInterpreter.completion(buf, cursor, interpreterContext);
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
-  }
-
-  public SparkZeppelinContext getZeppelinContext() {
-    return this.z;
-  }
-
-  public SparkContext getSparkContext() {
-    return this.sc;
-  }
-
-  @Override
-  public SQLContext getSQLContext() {
-    return sqlContext;
-  }
-
-  public JavaSparkContext getJavaSparkContext() {
-    return this.jsc;
-  }
-
-  public Object getSparkSession() {
-    return sparkSession;
-  }
-
-  public SparkVersion getSparkVersion() {
-    return sparkVersion;
-  }
-
-  private String extractScalaVersion() throws IOException, InterruptedException {
-    String scalaVersionString = scala.util.Properties.versionString();
-    if (scalaVersionString.contains("version 2.10")) {
-      return "2.10";
-    } else {
-      return "2.11";
-    }
-  }
-
-  public boolean isSparkContextInitialized() {
-    return this.sc != null;
-  }
-
-  private List<String> getDependencyFiles() throws InterpreterException {
-    List<String> depFiles = new ArrayList<>();
-    // add jar from DepInterpreter
-    DepInterpreter depInterpreter = getParentSparkInterpreter().
-        getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFilesDist();
-        if (files != null) {
-          for (File f : files) {
-            depFiles.add(f.getAbsolutePath());
-          }
-        }
-      }
-    }
-
-    // add jar from local repo
-    String localRepo = getProperty("zeppelin.interpreter.localRepo");
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          for (File f : files) {
-            depFiles.add(f.getAbsolutePath());
-          }
-        }
-      }
-    }
-    return depFiles;
-  }
-
-  @Override
-  public String getSparkUIUrl() {
-    return sparkUrl;
-  }
-
-  @Override
-  public boolean isUnsupportedSparkVersion() {
-    return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
-  }
-}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
deleted file mode 100644
index 8eb3959..0000000
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ /dev/null
@@ -1,1281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.spark.JobProgressUtil;
-import org.apache.spark.SecurityManager;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.ui.SparkUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Console;
-import scala.Enumeration.Value;
-import scala.None;
-import scala.Option;
-import scala.Some;
-import scala.collection.JavaConversions;
-import scala.collection.convert.WrapAsJava$;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Spark interpreter for Zeppelin.
- *
- */
-public class OldSparkInterpreter extends AbstractSparkInterpreter {
-  public static Logger logger = LoggerFactory.getLogger(OldSparkInterpreter.class);
-
-  private SparkZeppelinContext z;
-  private SparkILoop interpreter;
-  /**
-   * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
-   * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
-   */
-  private Object intp;
-  private SparkConf conf;
-  private static SparkContext sc;
-  private static SQLContext sqlc;
-  private static InterpreterHookRegistry hooks;
-  private static SparkEnv env;
-  private static Object sparkSession;    // spark 2.x
-  private static SparkListener sparkListener;
-  private static AbstractFile classOutputDir;
-  private static Integer sharedInterpreterLock = new Integer(0);
-  private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
-
-  private InterpreterOutputStream out;
-  private SparkDependencyResolver dep;
-  private static String sparkUrl;
-
-  /**
-   * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
-   */
-  private Object completer = null;
-
-  private Map<String, Object> binder;
-  private SparkVersion sparkVersion;
-  private static File outputDir;          // class outputdir for scala 2.11
-  private Object classServer;      // classserver for scala 2.11
-  private JavaSparkContext jsc;
-  private boolean enableSupportedVersionCheck;
-
-  private SparkShims sparkShims;
-
-  public OldSparkInterpreter(Properties property) {
-    super(property);
-    out = new InterpreterOutputStream(logger);
-  }
-
-  public OldSparkInterpreter(Properties property, SparkContext sc) {
-    this(property);
-    this.sc = sc;
-    env = SparkEnv.get();
-  }
-
-  public SparkContext getSparkContext() {
-    synchronized (sharedInterpreterLock) {
-      if (sc == null) {
-        sc = createSparkContext();
-        env = SparkEnv.get();
-      }
-      return sc;
-    }
-  }
-
-  public JavaSparkContext getJavaSparkContext() {
-    synchronized (sharedInterpreterLock) {
-      if (jsc == null) {
-        jsc = JavaSparkContext.fromSparkContext(sc);
-      }
-      return jsc;
-    }
-  }
-
-  public boolean isSparkContextInitialized() {
-    synchronized (sharedInterpreterLock) {
-      return sc != null;
-    }
-  }
-
-  private boolean useHiveContext() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
-  }
-
-  /**
-   * See org.apache.spark.sql.SparkSession.hiveClassesArePresent
-   * @return
-   */
-  private boolean hiveClassesArePresent() {
-    try {
-      this.getClass().forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable");
-      this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
-      return true;
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return false;
-    }
-  }
-
-  private boolean importImplicit() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
-  }
-
-  public Object getSparkSession() {
-    synchronized (sharedInterpreterLock) {
-      if (sparkSession == null) {
-        createSparkSession();
-      }
-      return sparkSession;
-    }
-  }
-
-  public SQLContext getSQLContext() {
-    synchronized (sharedInterpreterLock) {
-      if (Utils.isSpark2()) {
-        return getSQLContext_2();
-      } else {
-        return getSQLContext_1();
-      }
-    }
-  }
-
-  /**
-   * Get SQLContext for spark 2.x
-   */
-  private SQLContext getSQLContext_2() {
-    if (sqlc == null) {
-      sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
-    }
-    return sqlc;
-  }
-
-  public SQLContext getSQLContext_1() {
-    if (sqlc == null) {
-      if (useHiveContext()) {
-        String name = "org.apache.spark.sql.hive.HiveContext";
-        Constructor<?> hc;
-        try {
-          hc = getClass().getClassLoader().loadClass(name)
-              .getConstructor(SparkContext.class);
-          sqlc = (SQLContext) hc.newInstance(getSparkContext());
-        } catch (NoSuchMethodException | SecurityException
-            | ClassNotFoundException | InstantiationException
-            | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException e) {
-          logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
-          // when hive dependency is not loaded, it'll fail.
-          // in this case SQLContext can be used.
-          sqlc = new SQLContext(getSparkContext());
-        }
-      } else {
-        sqlc = new SQLContext(getSparkContext());
-      }
-    }
-    return sqlc;
-  }
-
-
-  public SparkDependencyResolver getDependencyResolver() {
-    if (dep == null) {
-      dep = new SparkDependencyResolver(
-          (Global) Utils.invokeMethod(intp, "global"),
-          (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"),
-          sc,
-          getProperty("zeppelin.dep.localrepo"),
-          getProperty("zeppelin.dep.additionalRemoteRepository"));
-    }
-    return dep;
-  }
-
-  public boolean isYarnMode() {
-    String master = getProperty("master");
-    if (master == null) {
-      master = getProperty("spark.master", "local[*]");
-    }
-    return master.startsWith("yarn");
-  }
-
-  /**
-   * Spark 2.x
-   * Create SparkSession
-   */
-  public Object createSparkSession() {
-    // use local mode for embedded spark mode when spark.master is not found
-    conf.setIfMissing("spark.master", "local");
-    logger.info("------ Create new SparkSession {} -------", conf.get("spark.master"));
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    if (outputDir != null) {
-      conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
-    }
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      if (!val.trim().isEmpty()) {
-        if (key.startsWith("spark.")) {
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
-          conf.set(key, val);
-        }
-        if (key.startsWith("zeppelin.spark.")) {
-          String sparkPropertyKey = key.substring("zeppelin.spark.".length());
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
-          conf.set(sparkPropertyKey, val);
-        }
-      }
-    }
-
-    Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
-    Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
-    Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
-
-    if (useHiveContext()) {
-      if (hiveClassesArePresent()) {
-        Utils.invokeMethod(builder, "enableHiveSupport");
-        sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-        logger.info("Created Spark session with Hive support");
-      } else {
-        Utils.invokeMethod(builder, "config",
-            new Class[]{ String.class, String.class},
-            new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
-        sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-        logger.info("Created Spark session with Hive support use in-memory catalogImplementation");
-      }
-    } else {
-      sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-      logger.info("Created Spark session");
-    }
-
-    return sparkSession;
-  }
-
-  public SparkContext createSparkContext() {
-    if (Utils.isSpark2()) {
-      return createSparkContext_2();
-    } else {
-      return createSparkContext_1();
-    }
-  }
-
-  /**
-   * Create SparkContext for spark 2.x
-   * @return
-   */
-  private SparkContext createSparkContext_2() {
-    return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
-  }
-
-  public SparkContext createSparkContext_1() {
-    // use local mode for embedded spark mode when spark.master is not found
-    if (!conf.contains("spark.master")) {
-      conf.setMaster("local");
-    }
-    logger.info("------ Create new SparkContext {} -------", conf.get("spark.master"));
-
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    String[] jars = null;
-
-    if (Utils.isScala2_10()) {
-      jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars");
-    } else {
-      jars = (String[]) Utils.invokeStaticMethod(
-          Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars");
-    }
-
-    String classServerUri = null;
-    String replClassOutputDirectory = null;
-
-    try { // in case of spark 1.1x, spark 1.2x
-      Method classServer = intp.getClass().getMethod("classServer");
-      Object httpServer = classServer.invoke(intp);
-      classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      // continue
-    }
-
-    if (classServerUri == null) {
-      try { // for spark 1.3x
-        Method classServer = intp.getClass().getMethod("classServerUri");
-        classServerUri = (String) classServer.invoke(intp);
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        // continue instead of: throw new InterpreterException(e);
-        // Newer Spark versions (like the patched CDH5.7.0 one) don't contain this method
-        logger.warn(String.format("Spark method classServerUri not available due to: [%s]",
-            e.getMessage()));
-      }
-    }
-
-    if (classServerUri == null) {
-      try { // for RcpEnv
-        Method getClassOutputDirectory = intp.getClass().getMethod("getClassOutputDirectory");
-        File classOutputDirectory = (File) getClassOutputDirectory.invoke(intp);
-        replClassOutputDirectory = classOutputDirectory.getAbsolutePath();
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        // continue
-      }
-    }
-
-    if (Utils.isScala2_11()) {
-      classServer = createHttpServer(outputDir);
-      Utils.invokeMethod(classServer, "start");
-      classServerUri = (String) Utils.invokeMethod(classServer, "uri");
-    }
-
-    if (classServerUri != null) {
-      conf.set("spark.repl.class.uri", classServerUri);
-    }
-
-    if (replClassOutputDirectory != null) {
-      conf.set("spark.repl.class.outputDir", replClassOutputDirectory);
-    }
-
-    if (jars.length > 0) {
-      conf.setJars(jars);
-    }
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      if (!val.trim().isEmpty()) {
-        if (key.startsWith("spark.")) {
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
-          conf.set(key, val);
-        }
-
-        if (key.startsWith("zeppelin.spark.")) {
-          String sparkPropertyKey = key.substring("zeppelin.spark.".length());
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
-          conf.set(sparkPropertyKey, val);
-        }
-      }
-    }
-    SparkContext sparkContext = new SparkContext(conf);
-    return sparkContext;
-  }
-
-  static final String toString(Object o) {
-    return (o instanceof String) ? (String) o : "";
-  }
-
-  public static boolean useSparkSubmit() {
-    return null != System.getenv("SPARK_SUBMIT");
-  }
-
-  public boolean printREPLOutput() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput"));
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
-        getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
-
-    // set properties and do login before creating any spark stuff for secured cluster
-    if (isYarnMode()) {
-      System.setProperty("SPARK_YARN_MODE", "true");
-    }
-    if (getProperties().containsKey("spark.yarn.keytab") &&
-        getProperties().containsKey("spark.yarn.principal")) {
-      try {
-        String keytab = getProperties().getProperty("spark.yarn.keytab");
-        String principal = getProperties().getProperty("spark.yarn.principal");
-        UserGroupInformation.loginUserFromKeytab(principal, keytab);
-      } catch (IOException e) {
-        throw new RuntimeException("Can not pass kerberos authentication", e);
-      }
-    }
-
-    conf = new SparkConf();
-    URL[] urls = getClassloaderUrls();
-
-    // Very nice discussion about how scala compiler handle classpath
-    // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
-
-    /*
-     * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
-     * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
-     * nsc.Settings.classpath.
-     *
-     * >> val settings = new Settings() >> settings.usejavacp.value = true >>
-     * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
-     * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
-     * getClass.getClassLoader >> } >> in.setContextClassLoader()
-     */
-    Settings settings = new Settings();
-
-    // process args
-    String args = getProperty("args");
-    if (args == null) {
-      args = "";
-    }
-
-    String[] argsArray = args.split(" ");
-    LinkedList<String> argList = new LinkedList<>();
-    for (String arg : argsArray) {
-      argList.add(arg);
-    }
-
-    DepInterpreter depInterpreter = getParentSparkInterpreter().
-        getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
-    String depInterpreterClasspath = "";
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (depInterpreterClasspath.length() > 0) {
-              depInterpreterClasspath += File.pathSeparator;
-            }
-            depInterpreterClasspath += f.getAbsolutePath();
-          }
-        }
-      }
-    }
-
-
-    if (Utils.isScala2_10()) {
-      scala.collection.immutable.List<String> list =
-          JavaConversions.asScalaBuffer(argList).toList();
-
-      Object sparkCommandLine = Utils.instantiateClass(
-          "org.apache.spark.repl.SparkCommandLine",
-          new Class[]{ scala.collection.immutable.List.class },
-          new Object[]{ list });
-
-      settings = (Settings) Utils.invokeMethod(sparkCommandLine, "settings");
-    } else {
-      String sparkReplClassDir = getProperty("spark.repl.classdir");
-      if (sparkReplClassDir == null) {
-        sparkReplClassDir = System.getProperty("spark.repl.classdir");
-      }
-      if (sparkReplClassDir == null) {
-        sparkReplClassDir = System.getProperty("java.io.tmpdir");
-      }
-
-      synchronized (sharedInterpreterLock) {
-        if (outputDir == null) {
-          outputDir = createTempDir(sparkReplClassDir);
-        }
-      }
-      argList.add("-Yrepl-class-based");
-      argList.add("-Yrepl-outdir");
-      argList.add(outputDir.getAbsolutePath());
-
-      String classpath = "";
-      if (conf.contains("spark.jars")) {
-        classpath = StringUtils.join(conf.get("spark.jars").split(","), File.separator);
-      }
-
-      if (!depInterpreterClasspath.isEmpty()) {
-        if (!classpath.isEmpty()) {
-          classpath += File.separator;
-        }
-        classpath += depInterpreterClasspath;
-      }
-
-      if (!classpath.isEmpty()) {
-        argList.add("-classpath");
-        argList.add(classpath);
-      }
-
-      scala.collection.immutable.List<String> list =
-          JavaConversions.asScalaBuffer(argList).toList();
-
-      settings.processArguments(list, true);
-    }
-
-    // set classpath for scala compiler
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
-    }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
-    }
-
-    // add dependency from DepInterpreter
-    if (classpath.length() > 0) {
-      classpath += File.pathSeparator;
-    }
-    classpath += depInterpreterClasspath;
-
-    // add dependency from local repo
-    String localRepo = getProperty("zeppelin.interpreter.localRepo");
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (classpath.length() > 0) {
-              classpath += File.pathSeparator;
-            }
-            classpath += f.getAbsolutePath();
-          }
-        }
-      }
-    }
-
-    pathSettings.v_$eq(classpath);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-    // set classloader for scala compiler
-    settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
-        .getContextClassLoader()));
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    /* Required for scoped mode.
-     * In scoped mode multiple scala compiler (repl) generates class in the same directory.
-     * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
-     * Therefore it's possible to generated class conflict(overwrite) with other repl generated
-     * class.
-     *
-     * To prevent generated class name conflict,
-     * change prefix of generated class name from each scala compiler (repl) instance.
-     *
-     * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
-     * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
-     *
-     * As hashCode() can return a negative integer value and the minus character '-' is invalid
-     * in a package name we change it to a numeric value '0' which still conforms to the regexp.
-     *
-     */
-    System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0'));
-
-    // To prevent 'File name too long' error on some file system.
-    MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
-    numClassFileSetting.v_$eq(128);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
-        numClassFileSetting);
-
-    synchronized (sharedInterpreterLock) {
-      /* create scala repl */
-      if (printREPLOutput()) {
-        this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
-      } else {
-        this.interpreter = new SparkILoop((java.io.BufferedReader) null,
-            new PrintWriter(Console.out(), false));
-      }
-
-      interpreter.settings_$eq(settings);
-
-      interpreter.createInterpreter();
-
-      intp = Utils.invokeMethod(interpreter, "intp");
-      Utils.invokeMethod(intp, "setContextClassLoader");
-      Utils.invokeMethod(intp, "initializeSynchronous");
-
-      if (Utils.isScala2_10()) {
-        if (classOutputDir == null) {
-          classOutputDir = settings.outputDirs().getSingleOutput().get();
-        } else {
-          // change SparkIMain class output dir
-          settings.outputDirs().setSingleOutput(classOutputDir);
-          ClassLoader cl = (ClassLoader) Utils.invokeMethod(intp, "classLoader");
-          try {
-            Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
-            rootField.setAccessible(true);
-            rootField.set(cl, classOutputDir);
-          } catch (NoSuchFieldException | IllegalAccessException e) {
-            logger.error(e.getMessage(), e);
-          }
-        }
-      }
-
-      if (Utils.findClass("org.apache.spark.repl.SparkJLineCompletion", true) != null) {
-        completer = Utils.instantiateClass(
-            "org.apache.spark.repl.SparkJLineCompletion",
-            new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
-            new Object[]{intp});
-      } else if (Utils.findClass(
-          "scala.tools.nsc.interpreter.PresentationCompilerCompleter", true) != null) {
-        completer = Utils.instantiateClass(
-            "scala.tools.nsc.interpreter.PresentationCompilerCompleter",
-            new Class[]{ IMain.class },
-            new Object[]{ intp });
-      } else if (Utils.findClass(
-          "scala.tools.nsc.interpreter.JLineCompletion", true) != null) {
-        completer = Utils.instantiateClass(
-            "scala.tools.nsc.interpreter.JLineCompletion",
-            new Class[]{ IMain.class },
-            new Object[]{ intp });
-      }
-
-      if (Utils.isSpark2()) {
-        sparkSession = getSparkSession();
-      }
-      sc = getSparkContext();
-      if (sc.getPoolForName("fair").isEmpty()) {
-        Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
-        int minimumShare = 0;
-        int weight = 1;
-        Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
-        sc.taskScheduler().rootPool().addSchedulable(pool);
-      }
-
-      sparkVersion = SparkVersion.fromVersionString(sc.version());
-      sqlc = getSQLContext();
-      dep = getDependencyResolver();
-      hooks = getInterpreterGroup().getInterpreterHookRegistry();
-      sparkUrl = getSparkUIUrl();
-      sparkShims = SparkShims.getInstance(sc.version(), getProperties());
-      sparkShims.setupSparkListener(sc.master(), sparkUrl, InterpreterContext.get());
-      numReferenceOfSparkContext.incrementAndGet();
-
-      z = new SparkZeppelinContext(sc, sparkShims, hooks,
-          Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
-
-      interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
-      Map<String, Object> binder;
-      if (Utils.isScala2_10()) {
-        binder = (Map<String, Object>) getValue("_binder");
-      } else {
-        binder = (Map<String, Object>) getLastObject();
-      }
-      binder.put("sc", sc);
-      binder.put("sqlc", sqlc);
-      binder.put("z", z);
-
-      if (Utils.isSpark2()) {
-        binder.put("spark", sparkSession);
-      }
-
-      interpret("@transient val z = "
-          + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]");
-      interpret("@transient val sc = "
-          + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
-      interpret("@transient val sqlc = "
-          + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-      interpret("@transient val sqlContext = "
-          + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-
-      if (Utils.isSpark2()) {
-        interpret("@transient val spark = "
-            + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]");
-      }
-
-      interpret("import org.apache.spark.SparkContext._");
-
-      if (importImplicit()) {
-        if (Utils.isSpark2()) {
-          interpret("import spark.implicits._");
-          interpret("import spark.sql");
-          interpret("import org.apache.spark.sql.functions._");
-        } else {
-          interpret("import sqlContext.implicits._");
-          interpret("import sqlContext.sql");
-          interpret("import org.apache.spark.sql.functions._");
-        }
-      }
-    }
-
-    /* Temporary disabling DisplayUtils. see https://issues.apache.org/jira/browse/ZEPPELIN-127
-     *
-    // Utility functions for display
-    intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._");
-
-    // Scala implicit value for spark.maxResult
-    intp.interpret("import org.apache.zeppelin.spark.utils.SparkMaxResult");
-    intp.interpret("implicit val sparkMaxResult = new SparkMaxResult(" +
-            Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")");
-     */
-
-    if (Utils.isScala2_10()) {
-      try {
-        Method loadFiles = this.interpreter.getClass().getMethod(
-            "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
-        loadFiles.invoke(this.interpreter, settings);
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        throw new InterpreterException(e);
-      }
-    }
-
-    // add jar from DepInterpreter
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFilesDist();
-        if (files != null) {
-          for (File f : files) {
-            if (f.getName().toLowerCase().endsWith(".jar")) {
-              sc.addJar(f.getAbsolutePath());
-              logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
-            } else {
-              sc.addFile(f.getAbsolutePath());
-              logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
-            }
-          }
-        }
-      }
-    }
-
-    // add jar from local repo
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (f.getName().toLowerCase().endsWith(".jar")) {
-              sc.addJar(f.getAbsolutePath());
-              logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
-            } else {
-              sc.addFile(f.getAbsolutePath());
-              logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
-            }
-          }
-        }
-      }
-    }
-
-  }
-
-  public String getSparkUIUrl() {
-    if (sparkUrl != null) {
-      return sparkUrl;
-    }
-
-    String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
-    if (!StringUtils.isBlank(sparkUrlProp)) {
-      return sparkUrlProp;
-    }
-
-    if (sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0)) {
-      Option<String> uiWebUrlOption = (Option<String>) Utils.invokeMethod(sc, "uiWebUrl");
-      if (uiWebUrlOption.isDefined()) {
-        return uiWebUrlOption.get();
-      }
-    } else {
-      Option<SparkUI> sparkUIOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
-      if (sparkUIOption.isDefined()) {
-        return (String) Utils.invokeMethod(sparkUIOption.get(), "appUIAddress");
-      }
-    }
-    return null;
-  }
-
-  private Results.Result interpret(String line) {
-    out.ignoreLeadingNewLinesFromScalaReporter();
-    return (Results.Result) Utils.invokeMethod(
-        intp,
-        "interpret",
-        new Class[] {String.class},
-        new Object[] {line});
-  }
-
-  private List<File> currentClassPath() {
-    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
-
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<>();
-    if (cl == null) {
-      return paths;
-    }
-
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-                                                InterpreterContext interpreterContext) {
-    if (completer == null) {
-      logger.warn("Can't find completer");
-      return new LinkedList<>();
-    }
-
-    if (buf.length() < cursor) {
-      cursor = buf.length();
-    }
-
-    ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
-
-    if (Utils.isScala2_10() || !Utils.isCompilerAboveScala2_11_7()) {
-      String singleToken = getCompletionTargetString(buf, cursor);
-      Candidates ret = c.complete(singleToken, singleToken.length());
-
-      List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
-      List<InterpreterCompletion> completions = new LinkedList<>();
-
-      for (String candidate : candidates) {
-        completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
-      }
-
-      return completions;
-    } else {
-      Candidates ret = c.complete(buf, cursor);
-
-      List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
-      List<InterpreterCompletion> completions = new LinkedList<>();
-
-      for (String candidate : candidates) {
-        completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
-      }
-
-      return completions;
-    }
-  }
-
-  private String getCompletionTargetString(String text, int cursor) {
-    String[] completionSeqCharaters = {" ", "\n", "\t"};
-    int completionEndPosition = cursor;
-    int completionStartPosition = cursor;
-    int indexOfReverseSeqPostion = cursor;
-
-    String resultCompletionText = "";
-    String completionScriptText = "";
-    try {
-      completionScriptText = text.substring(0, cursor);
-    }
-    catch (Exception e) {
-      logger.error(e.toString());
-      return null;
-    }
-    completionEndPosition = completionScriptText.length();
-
-    String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
-
-    for (String seqCharacter : completionSeqCharaters) {
-      indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
-
-      if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) {
-        completionStartPosition = indexOfReverseSeqPostion;
-      }
-
-    }
-
-    if (completionStartPosition == completionEndPosition) {
-      completionStartPosition = 0;
-    }
-    else
-    {
-      completionStartPosition = completionEndPosition - completionStartPosition;
-    }
-    resultCompletionText = completionScriptText.substring(
-        completionStartPosition , completionEndPosition);
-
-    return resultCompletionText;
-  }
-
-  /*
-   * this method doesn't work in scala 2.11
-   * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
-   */
-  public Object getValue(String name) {
-    Object ret = Utils.invokeMethod(
-        intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
-
-    if (ret instanceof None || ret instanceof scala.None$) {
-      return null;
-    } else if (ret instanceof Some) {
-      return ((Some) ret).get();
-    } else {
-      return ret;
-    }
-  }
-
-  public Object getLastObject() {
-    IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
-    if (r == null || r.lineRep() == null) {
-      return null;
-    }
-    Object obj = r.lineRep().call("$result",
-        JavaConversions.asScalaBuffer(new LinkedList<>()));
-    return obj;
-  }
-
-  public boolean isUnsupportedSparkVersion() {
-    return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
-  }
-
-  /**
-   * Interpret a single line.
-   */
-  @Override
-  public InterpreterResult internalInterpret(String line, InterpreterContext context) {
-    if (isUnsupportedSparkVersion()) {
-      return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
-          + " is not supported");
-    }
-    if (line == null || line.trim().length() == 0) {
-      return new InterpreterResult(Code.SUCCESS);
-    }
-    return interpret(line.split("\n"), context);
-  }
-
-  public InterpreterResult interpret(String[] lines, InterpreterContext context) {
-    synchronized (this) {
-      z.setGui(context.getGui());
-      z.setNoteGui(context.getNoteGui());
-      sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
-      InterpreterResult r = interpretInput(lines, context);
-      sc.clearJobGroup();
-      return r;
-    }
-  }
-
-  public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
-    SparkEnv.set(env);
-
-    String[] linesToRun = new String[lines.length];
-    for (int i = 0; i < lines.length; i++) {
-      linesToRun[i] = lines[i];
-    }
-
-    Console.setOut(context.out);
-    out.setInterpreterOutput(context.out);
-    context.out.clear();
-    Code r = null;
-    String incomplete = "";
-    boolean inComment = false;
-
-    for (int l = 0; l < linesToRun.length; l++) {
-      String s = linesToRun[l];
-      // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
-      if (l + 1 < linesToRun.length) {
-        String nextLine = linesToRun[l + 1].trim();
-        boolean continuation = false;
-        if (nextLine.isEmpty()
-            || nextLine.startsWith("//")         // skip empty line or comment
-            || nextLine.startsWith("}")
-            || nextLine.startsWith("object")) {  // include "} object" for Scala companion object
-          continuation = true;
-        } else if (!inComment && nextLine.startsWith("/*")) {
-          inComment = true;
-          continuation = true;
-        } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
-          inComment = false;
-          continuation = true;
-        } else if (nextLine.length() > 1
-            && nextLine.charAt(0) == '.'
-            && nextLine.charAt(1) != '.'     // ".."
-            && nextLine.charAt(1) != '/') {  // "./"
-          continuation = true;
-        } else if (inComment) {
-          continuation = true;
-        }
-        if (continuation) {
-          incomplete += s + "\n";
-          continue;
-        }
-      }
-
-      scala.tools.nsc.interpreter.Results.Result res = null;
-      try {
-        res = interpret(incomplete + s);
-      } catch (Exception e) {
-        sc.clearJobGroup();
-        out.setInterpreterOutput(null);
-        logger.info("Interpreter exception", e);
-        return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
-      }
-
-      r = getResultCode(res);
-
-      if (r == Code.ERROR) {
-        sc.clearJobGroup();
-        out.setInterpreterOutput(null);
-        return new InterpreterResult(r, "");
-      } else if (r == Code.INCOMPLETE) {
-        incomplete += s + "\n";
-      } else {
-        incomplete = "";
-      }
-    }
-
-    // make sure code does not finish with comment
-    if (r == Code.INCOMPLETE) {
-      scala.tools.nsc.interpreter.Results.Result res = null;
-      res = interpret(incomplete + "\nprint(\"\")");
-      r = getResultCode(res);
-    }
-
-    if (r == Code.INCOMPLETE) {
-      sc.clearJobGroup();
-      out.setInterpreterOutput(null);
-      return new InterpreterResult(r, "Incomplete expression");
-    } else {
-      sc.clearJobGroup();
-      putLatestVarInResourcePool(context);
-      out.setInterpreterOutput(null);
-      return new InterpreterResult(Code.SUCCESS);
-    }
-  }
-
-  private void putLatestVarInResourcePool(InterpreterContext context) {
-    String varName = (String) Utils.invokeMethod(intp, "mostRecentVar");
-    if (varName == null || varName.isEmpty()) {
-      return;
-    }
-    Object lastObj = null;
-    try {
-      if (Utils.isScala2_10()) {
-        lastObj = getValue(varName);
-      } else {
-        lastObj = getLastObject();
-      }
-    } catch (NullPointerException e) {
-      // Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE
-      logger.error(e.getMessage(), e);
-    }
-
-    if (lastObj != null) {
-      ResourcePool resourcePool = context.getResourcePool();
-      resourcePool.put(context.getNoteId(), context.getParagraphId(),
-          WellKnownResourceName.ZeppelinReplResult.toString(), lastObj);
-    }
-  };
-
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    sc.cancelJobGroup(Utils.buildJobGroupId(context));
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    String jobGroup = Utils.buildJobGroupId(context);
-    return JobProgressUtil.progress(sc, jobGroup);
-  }
-
-  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
-    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
-      return Code.SUCCESS;
-    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
-      return Code.INCOMPLETE;
-    } else {
-      return Code.ERROR;
-    }
-  }
-
-  @Override
-  public void close() {
-    logger.info("Close interpreter");
-
-    if (numReferenceOfSparkContext.decrementAndGet() == 0) {
-      if (sparkSession != null) {
-        Utils.invokeMethod(sparkSession, "stop");
-      } else if (sc != null){
-        sc.stop();
-      }
-      sparkSession = null;
-      sc = null;
-      jsc = null;
-      if (classServer != null) {
-        Utils.invokeMethod(classServer, "stop");
-        classServer = null;
-      }
-    }
-
-    Utils.invokeMethod(intp, "close");
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-        OldSparkInterpreter.class.getName() + this.hashCode());
-  }
-
-  public SparkZeppelinContext getZeppelinContext() {
-    return z;
-  }
-
-  public SparkVersion getSparkVersion() {
-    return sparkVersion;
-  }
-
-  private File createTempDir(String dir) {
-    File file = null;
-
-    // try Utils.createTempDir()
-    file = (File) Utils.invokeStaticMethod(
-        Utils.findClass("org.apache.spark.util.Utils"),
-        "createTempDir",
-        new Class[]{String.class, String.class},
-        new Object[]{dir, "spark"});
-
-    // fallback to old method
-    if (file == null) {
-      file = (File) Utils.invokeStaticMethod(
-          Utils.findClass("org.apache.spark.util.Utils"),
-          "createTempDir",
-          new Class[]{String.class},
-          new Object[]{dir});
-    }
-
-    return file;
-  }
-
-  private Object createHttpServer(File outputDir) {
-    SparkConf conf = new SparkConf();
-    try {
-      // try to create HttpServer
-      Constructor<?> constructor = getClass().getClassLoader()
-          .loadClass("org.apache.spark.HttpServer")
-          .getConstructor(new Class[]{
-            SparkConf.class, File.class, SecurityManager.class, int.class, String.class});
-
-      Object securityManager = createSecurityManager(conf);
-      return constructor.newInstance(new Object[]{
-        conf, outputDir, securityManager, 0, "HTTP Server"});
-
-    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
-        InstantiationException | InvocationTargetException e) {
-      // fallback to old constructor
-      Constructor<?> constructor = null;
-      try {
-        constructor = getClass().getClassLoader()
-            .loadClass("org.apache.spark.HttpServer")
-            .getConstructor(new Class[]{
-              File.class, SecurityManager.class, int.class, String.class});
-        return constructor.newInstance(new Object[] {
-          outputDir, createSecurityManager(conf), 0, "HTTP Server"});
-      } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
-          InstantiationException | InvocationTargetException e1) {
-        logger.error(e1.getMessage(), e1);
-        return null;
-      }
-    }
-  }
-
-  /**
-   * Constructor signature of SecurityManager changes in spark 2.1.0, so we use this method to
-   * create SecurityManager properly for different versions of spark
-   *
-   * @param conf
-   * @return
-   * @throws ClassNotFoundException
-   * @throws NoSuchMethodException
-   * @throws IllegalAccessException
-   * @throws InvocationTargetException
-   * @throws InstantiationException
-   */
-  private Object createSecurityManager(SparkConf conf) throws ClassNotFoundException,
-      NoSuchMethodException, IllegalAccessException, InvocationTargetException,
-      InstantiationException {
-    Object securityManager = null;
-    try {
-      Constructor<?> smConstructor = getClass().getClassLoader()
-          .loadClass("org.apache.spark.SecurityManager")
-          .getConstructor(new Class[]{ SparkConf.class, scala.Option.class });
-      securityManager = smConstructor.newInstance(conf, null);
-    } catch (NoSuchMethodException e) {
-      Constructor<?> smConstructor = getClass().getClassLoader()
-          .loadClass("org.apache.spark.SecurityManager")
-          .getConstructor(new Class[]{ SparkConf.class });
-      securityManager = smConstructor.newInstance(conf);
-    }
-    return securityManager;
-  }
-}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index c365345..d1433e8 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -27,7 +27,6 @@ import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.python.IPythonInterpreter;
 import org.apache.zeppelin.python.PythonInterpreter;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,30 +58,8 @@ public class PySparkInterpreter extends PythonInterpreter {
   @Override
   public void open() throws InterpreterException {
     setProperty("zeppelin.python.useIPython", getProperty("zeppelin.pyspark.useIPython", "true"));
-
-    // create SparkInterpreter in JVM side TODO(zjffdu) move to SparkInterpreter
-    DepInterpreter depInterpreter =
-        getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
-    // load libraries from Dependency Interpreter
     URL [] urls = new URL[0];
     List<URL> urlList = new LinkedList<>();
-
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFiles();
-        if (files != null) {
-          for (File f : files) {
-            try {
-              urlList.add(f.toURI().toURL());
-            } catch (MalformedURLException e) {
-              LOGGER.error("Error", e);
-            }
-          }
-        }
-      }
-    }
-
     String localRepo = getProperty("zeppelin.interpreter.localRepo");
     if (localRepo != null) {
       File localRepoDir = new File(localRepo);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index ec0be5f..1b5b9f6 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -17,31 +17,52 @@
 
 package org.apache.zeppelin.spark;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.AbstractInterpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
- * It is the Wrapper of OldSparkInterpreter & NewSparkInterpreter.
- * Property zeppelin.spark.useNew control which one to use.
+ * SparkInterpreter of Java implementation. It is just wrapper of Spark211Interpreter
+ * and Spark210Interpreter.
  */
-public class SparkInterpreter extends AbstractSparkInterpreter {
+public class SparkInterpreter extends AbstractInterpreter {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class);
 
-  // either OldSparkInterpreter or NewSparkInterpreter
-  private AbstractSparkInterpreter delegation;
+  private BaseSparkScalaInterpreter innerInterpreter;
+  private Map<String, String> innerInterpreterClassMap = new HashMap<>();
+  private SparkContext sc;
+  private JavaSparkContext jsc;
+  private SQLContext sqlContext;
+  private Object sparkSession;
+
+  private SparkZeppelinContext z;
+  private SparkVersion sparkVersion;
+  private boolean enableSupportedVersionCheck;
+  private String sparkUrl;
+  private SparkShims sparkShims;
+
+  private static InterpreterHookRegistry hooks;
 
 
   public SparkInterpreter(Properties properties) {
@@ -50,46 +71,103 @@ public class SparkInterpreter extends AbstractSparkInterpreter {
     if (Boolean.parseBoolean(properties.getProperty("zeppelin.spark.scala.color", "true"))) {
       System.setProperty("scala.color", "true");
     }
-    if (Boolean.parseBoolean(properties.getProperty("zeppelin.spark.useNew", "false"))) {
-      delegation = new NewSparkInterpreter(properties);
-    } else {
-      delegation = new OldSparkInterpreter(properties);
-    }
-    delegation.setParentSparkInterpreter(this);
+    this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
+        properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
+    innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter");
+    innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter");
   }
 
   @Override
   public void open() throws InterpreterException {
-    delegation.setInterpreterGroup(getInterpreterGroup());
-    delegation.setUserName(getUserName());
-    delegation.setClassloaderUrls(getClassloaderUrls());
-
-    delegation.open();
+    try {
+      String scalaVersion = extractScalaVersion();
+      LOGGER.info("Using Scala Version: " + scalaVersion);
+      SparkConf conf = new SparkConf();
+      for (Map.Entry<Object, Object> entry : getProperties().entrySet()) {
+        if (!StringUtils.isBlank(entry.getValue().toString())) {
+          conf.set(entry.getKey().toString(), entry.getValue().toString());
+        }
+        // zeppelin.spark.useHiveContext & zeppelin.spark.concurrentSQL are legacy zeppelin
+        // properties, convert them to spark properties here.
+        if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) {
+          conf.set("spark.useHiveContext", entry.getValue().toString());
+        }
+        if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL")
+            && entry.getValue().toString().equals("true")) {
+          conf.set("spark.scheduler.mode", "FAIR");
+        }
+      }
+      // use local mode for embedded spark mode when spark.master is not found
+      conf.setIfMissing("spark.master", "local");
+
+      String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
+      Class clazz = Class.forName(innerIntpClassName);
+      this.innerInterpreter = (BaseSparkScalaInterpreter)
+          clazz.getConstructor(SparkConf.class, List.class, Boolean.class)
+              .newInstance(conf, getDependencyFiles(),
+                  Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput", "true")));
+      this.innerInterpreter.open();
+
+      sc = this.innerInterpreter.sc();
+      jsc = JavaSparkContext.fromSparkContext(sc);
+      sparkVersion = SparkVersion.fromVersionString(sc.version());
+      if (enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion()) {
+        throw new Exception("This is not officially supported spark version: " + sparkVersion
+            + "\nYou can set zeppelin.spark.enableSupportedVersionCheck to false if you really" +
+            " want to try this version of spark.");
+      }
+      sqlContext = this.innerInterpreter.sqlContext();
+      sparkSession = this.innerInterpreter.sparkSession();
+      hooks = getInterpreterGroup().getInterpreterHookRegistry();
+      sparkUrl = this.innerInterpreter.sparkUrl();
+      String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
+      if (!StringUtils.isBlank(sparkUrlProp)) {
+        sparkUrl = sparkUrlProp;
+      }
+      sparkShims = SparkShims.getInstance(sc.version(), getProperties());
+      sparkShims.setupSparkListener(sc.master(), sparkUrl, InterpreterContext.get());
+
+      z = new SparkZeppelinContext(sc, sparkShims, hooks,
+          Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
+      this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z,
+          Lists.newArrayList("@transient"));
+    } catch (Exception e) {
+      LOGGER.error("Fail to open SparkInterpreter", e);
+      throw new InterpreterException("Fail to open SparkInterpreter", e);
+    }
   }
 
   @Override
-  public void close() throws InterpreterException {
-    delegation.close();
+  public void close() {
+    LOGGER.info("Close SparkInterpreter");
+    if (innerInterpreter != null) {
+      innerInterpreter.close();
+      innerInterpreter = null;
+    }
   }
 
   @Override
-  public InterpreterResult internalInterpret(String st, InterpreterContext context)
-      throws InterpreterException {
-    Utils.printDeprecateMessage(delegation.getSparkVersion(), context, properties);
-    return delegation.interpret(st, context);
+  public InterpreterResult internalInterpret(String st, InterpreterContext context) {
+    context.out.clear();
+    sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
+    // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph
+    // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool
+    sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
+
+    return innerInterpreter.interpret(st, context);
   }
 
   @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    delegation.cancel(context);
+  public void cancel(InterpreterContext context) {
+    sc.cancelJobGroup(Utils.buildJobGroupId(context));
   }
 
   @Override
   public List<InterpreterCompletion> completion(String buf,
                                                 int cursor,
-                                                InterpreterContext interpreterContext)
-      throws InterpreterException {
-    return delegation.completion(buf, cursor, interpreterContext);
+                                                InterpreterContext interpreterContext) {
+    LOGGER.debug("buf: " + buf + ", cursor:" + cursor);
+    return innerInterpreter.completion(buf, cursor, interpreterContext);
   }
 
   @Override
@@ -98,68 +176,70 @@ public class SparkInterpreter extends AbstractSparkInterpreter {
   }
 
   @Override
-  public int getProgress(InterpreterContext context) throws InterpreterException {
-    return delegation.getProgress(context);
+  public int getProgress(InterpreterContext context) {
+    return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
   }
 
-  public AbstractSparkInterpreter getDelegation() {
-    return delegation;
+  public SparkZeppelinContext getZeppelinContext() {
+    return this.z;
   }
 
-
-  @Override
   public SparkContext getSparkContext() {
-    return delegation.getSparkContext();
+    return this.sc;
   }
 
-  @Override
   public SQLContext getSQLContext() {
-    return delegation.getSQLContext();
+    return sqlContext;
   }
 
-  @Override
-  public Object getSparkSession() {
-    return delegation.getSparkSession();
+  public JavaSparkContext getJavaSparkContext() {
+    return this.jsc;
   }
 
-  @Override
-  public boolean isSparkContextInitialized() {
-    return delegation.isSparkContextInitialized();
+  public Object getSparkSession() {
+    return sparkSession;
   }
 
-  @Override
   public SparkVersion getSparkVersion() {
-    return delegation.getSparkVersion();
+    return sparkVersion;
   }
 
-  @Override
-  public JavaSparkContext getJavaSparkContext() {
-    return delegation.getJavaSparkContext();
+  private String extractScalaVersion() throws IOException, InterruptedException {
+    String scalaVersionString = scala.util.Properties.versionString();
+    if (scalaVersionString.contains("version 2.10")) {
+      return "2.10";
+    } else {
+      return "2.11";
+    }
   }
 
-  @Override
-  public BaseZeppelinContext getZeppelinContext() {
-    return delegation.getZeppelinContext();
+  public boolean isSparkContextInitialized() {
+    return this.sc != null;
+  }
+
+  private List<String> getDependencyFiles() throws InterpreterException {
+    List<String> depFiles = new ArrayList<>();
+    // add jar from local repo
+    String localRepo = getProperty("zeppelin.interpreter.localRepo");
+    if (localRepo != null) {
+      File localRepoDir = new File(localRepo);
+      if (localRepoDir.exists()) {
+        File[] files = localRepoDir.listFiles();
+        if (files != null) {
+          for (File f : files) {
+            depFiles.add(f.getAbsolutePath());
+          }
+        }
+      }
+    }
+    return depFiles;
   }
 
-  @Override
   public String getSparkUIUrl() {
-    return delegation.getSparkUIUrl();
+    return sparkUrl;
   }
 
   public boolean isUnsupportedSparkVersion() {
-    return delegation.isUnsupportedSparkVersion();
-  }
-
-  public boolean isYarnMode() {
-    String master = getProperty("master");
-    if (master == null) {
-      master = getProperty("spark.master", "local[*]");
-    }
-    return master.startsWith("yarn");
-  }
-
-  public static boolean useSparkSubmit() {
-    return null != System.getenv("SPARK_SUBMIT");
+    return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
   }
 }
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java
deleted file mode 100644
index 0235fc6..0000000
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.zeppelin.dep.Booter;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.Repository;
-
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.artifact.Artifact;
-import org.sonatype.aether.collection.CollectRequest;
-import org.sonatype.aether.graph.DependencyFilter;
-import org.sonatype.aether.repository.RemoteRepository;
-import org.sonatype.aether.repository.Authentication;
-import org.sonatype.aether.resolution.ArtifactResolutionException;
-import org.sonatype.aether.resolution.ArtifactResult;
-import org.sonatype.aether.resolution.DependencyRequest;
-import org.sonatype.aether.resolution.DependencyResolutionException;
-import org.sonatype.aether.util.artifact.DefaultArtifact;
-import org.sonatype.aether.util.artifact.JavaScopes;
-import org.sonatype.aether.util.filter.DependencyFilterUtils;
-import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
-
-
-/**
- *
- */
-public class SparkDependencyContext {
-  List<Dependency> dependencies = new LinkedList<>();
-  List<Repository> repositories = new LinkedList<>();
-
-  List<File> files = new LinkedList<>();
-  List<File> filesDist = new LinkedList<>();
-  private RepositorySystem system = Booter.newRepositorySystem();
-  private RepositorySystemSession session;
-  private RemoteRepository mavenCentral = Booter.newCentralRepository();
-  private RemoteRepository mavenLocal = Booter.newLocalRepository();
-  private List<RemoteRepository> additionalRepos = new LinkedList<>();
-
-  public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) {
-    session =  Booter.newRepositorySystemSession(system, localRepoPath);
-    addRepoFromProperty(additionalRemoteRepository);
-  }
-
-  public Dependency load(String lib) {
-    Dependency dep = new Dependency(lib);
-
-    if (dependencies.contains(dep)) {
-      dependencies.remove(dep);
-    }
-    dependencies.add(dep);
-    return dep;
-  }
-
-  public Repository addRepo(String name) {
-    Repository rep = new Repository(name);
-    repositories.add(rep);
-    return rep;
-  }
-
-  public void reset() {
-    dependencies = new LinkedList<>();
-    repositories = new LinkedList<>();
-
-    files = new LinkedList<>();
-    filesDist = new LinkedList<>();
-  }
-
-  private void addRepoFromProperty(String listOfRepo) {
-    if (listOfRepo != null) {
-      String[] repos = listOfRepo.split(";");
-      for (String repo : repos) {
-        String[] parts = repo.split(",");
-        if (parts.length == 3) {
-          String id = parts[0].trim();
-          String url = parts[1].trim();
-          boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
-          if (id.length() > 1 && url.length() > 1) {
-            RemoteRepository rr = new RemoteRepository(id, "default", url);
-            rr.setPolicy(isSnapshot, null);
-            additionalRepos.add(rr);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * fetch all artifacts
-   * @return
-   * @throws MalformedURLException
-   * @throws ArtifactResolutionException
-   * @throws DependencyResolutionException
-   */
-  public List<File> fetch() throws MalformedURLException,
-      DependencyResolutionException, ArtifactResolutionException {
-
-    for (Dependency dep : dependencies) {
-      if (!dep.isLocalFsArtifact()) {
-        List<ArtifactResult> artifacts = fetchArtifactWithDep(dep);
-        for (ArtifactResult artifact : artifacts) {
-          if (dep.isDist()) {
-            filesDist.add(artifact.getArtifact().getFile());
-          }
-          files.add(artifact.getArtifact().getFile());
-        }
-      } else {
-        if (dep.isDist()) {
-          filesDist.add(new File(dep.getGroupArtifactVersion()));
-        }
-        files.add(new File(dep.getGroupArtifactVersion()));
-      }
-    }
-
-    return files;
-  }
-
-  private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
-      throws DependencyResolutionException, ArtifactResolutionException {
-    Artifact artifact = new DefaultArtifact(
-        SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion()));
-
-    DependencyFilter classpathFlter = DependencyFilterUtils
-        .classpathFilter(JavaScopes.COMPILE);
-    PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(
-        SparkDependencyResolver.inferScalaVersion(dep.getExclusions()));
-
-    CollectRequest collectRequest = new CollectRequest();
-    collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
-        JavaScopes.COMPILE));
-
-    collectRequest.addRepository(mavenCentral);
-    collectRequest.addRepository(mavenLocal);
-    for (RemoteRepository repo : additionalRepos) {
-      collectRequest.addRepository(repo);
-    }
-    for (Repository repo : repositories) {
-      RemoteRepository rr = new RemoteRepository(repo.getId(), "default", repo.getUrl());
-      rr.setPolicy(repo.isSnapshot(), null);
-      Authentication auth = repo.getAuthentication();
-      if (auth != null) {
-        rr.setAuthentication(auth);
-      }
-      collectRequest.addRepository(rr);
-    }
-
-    DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
-        DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
-
-    return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
-  }
-
-  public List<File> getFiles() {
-    return files;
-  }
-
-  public List<File> getFilesDist() {
-    return filesDist;
-  }
-}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java
deleted file mode 100644
index 46224a8..0000000
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark.dep;
-
-import java.io.File;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkContext;
-import org.apache.zeppelin.dep.AbstractDependencyResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.artifact.Artifact;
-import org.sonatype.aether.collection.CollectRequest;
-import org.sonatype.aether.graph.Dependency;
-import org.sonatype.aether.graph.DependencyFilter;
-import org.sonatype.aether.repository.RemoteRepository;
-import org.sonatype.aether.resolution.ArtifactResult;
-import org.sonatype.aether.resolution.DependencyRequest;
-import org.sonatype.aether.util.artifact.DefaultArtifact;
-import org.sonatype.aether.util.artifact.JavaScopes;
-import org.sonatype.aether.util.filter.DependencyFilterUtils;
-import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
-
-import scala.Some;
-import scala.collection.IndexedSeq;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.backend.JavaPlatform;
-import scala.tools.nsc.util.ClassPath;
-import scala.tools.nsc.util.MergedClassPath;
-
-/**
- * Deps resolver.
- * Add new dependencies from mvn repo (at runtime) to Spark interpreter group.
- */
-public class SparkDependencyResolver extends AbstractDependencyResolver {
-  Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class);
-  private Global global;
-  private ClassLoader runtimeClassLoader;
-  private SparkContext sc;
-
-  private final String[] exclusions = new String[] {"org.scala-lang:scala-library",
-                                                    "org.scala-lang:scala-compiler",
-                                                    "org.scala-lang:scala-reflect",
-                                                    "org.scala-lang:scalap",
-                                                    "org.apache.zeppelin:zeppelin-zengine",
-                                                    "org.apache.zeppelin:zeppelin-spark",
-                                                    "org.apache.zeppelin:zeppelin-server"};
-
-  public SparkDependencyResolver(Global global,
-                                 ClassLoader runtimeClassLoader,
-                                 SparkContext sc,
-                                 String localRepoPath,
-                                 String additionalRemoteRepository) {
-    super(localRepoPath);
-    this.global = global;
-    this.runtimeClassLoader = runtimeClassLoader;
-    this.sc = sc;
-    addRepoFromProperty(additionalRemoteRepository);
-  }
-
-  private void addRepoFromProperty(String listOfRepo) {
-    if (listOfRepo != null) {
-      String[] repos = listOfRepo.split(";");
-      for (String repo : repos) {
-        String[] parts = repo.split(",");
-        if (parts.length == 3) {
-          String id = parts[0].trim();
-          String url = parts[1].trim();
-          boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
-          if (id.length() > 1 && url.length() > 1) {
-            addRepo(id, url, isSnapshot);
-          }
-        }
-      }
-    }
-  }
-
-  private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException,
-      IllegalArgumentException, InvocationTargetException {
-
-    JavaPlatform platform = (JavaPlatform) global.platform();
-    MergedClassPath<AbstractFile> newClassPath = mergeUrlsIntoClassPath(platform, urls);
-
-    Method[] methods = platform.getClass().getMethods();
-    for (Method m : methods) {
-      if (m.getName().endsWith("currentClassPath_$eq")) {
-        m.invoke(platform, new Some(newClassPath));
-        break;
-      }
-    }
-
-    // NOTE: Must use reflection until this is exposed/fixed upstream in Scala
-    List<String> classPaths = new LinkedList<>();
-    for (URL url : urls) {
-      classPaths.add(url.getPath());
-    }
-
-    // Reload all jars specified into our compiler
-    global.invalidateClassPathEntries(scala.collection.JavaConversions.asScalaBuffer(classPaths)
-        .toList());
-  }
-
-  // Until spark 1.1.x
-  // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
-  private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException,
-      IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException {
-    Method addURL;
-    addURL = runtimeClassLoader.getClass().getDeclaredMethod("addURL", new Class[] {URL.class});
-    addURL.setAccessible(true);
-    for (URL url : urls) {
-      addURL.invoke(runtimeClassLoader, url);
-    }
-  }
-
-  private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException,
-      IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException {
-    Method addURL;
-    addURL = runtimeClassLoader.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class});
-    addURL.setAccessible(true);
-    for (URL url : urls) {
-      addURL.invoke(runtimeClassLoader, url);
-    }
-  }
-
-  private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
-    IndexedSeq<ClassPath<AbstractFile>> entries =
-        ((MergedClassPath<AbstractFile>) platform.classPath()).entries();
-    List<ClassPath<AbstractFile>> cp = new LinkedList<>();
-
-    for (int i = 0; i < entries.size(); i++) {
-      cp.add(entries.apply(i));
-    }
-
-    for (URL url : urls) {
-      AbstractFile file;
-      if ("file".equals(url.getProtocol())) {
-        File f = new File(url.getPath());
-        if (f.isDirectory()) {
-          file = AbstractFile.getDirectory(scala.reflect.io.File.jfile2path(f));
-        } else {
-          file = AbstractFile.getFile(scala.reflect.io.File.jfile2path(f));
-        }
-      } else {
-        file = AbstractFile.getURL(url);
-      }
-
-      ClassPath<AbstractFile> newcp = platform.classPath().context().newClassPath(file);
-
-      // distinct
-      if (cp.contains(newcp) == false) {
-        cp.add(newcp);
-      }
-    }
-
-    return new MergedClassPath(scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(),
-        platform.classPath().context());
-  }
-
-  public List<String> load(String artifact,
-      boolean addSparkContext) throws Exception {
-    return load(artifact, new LinkedList<String>(), addSparkContext);
-  }
-
-  public List<String> load(String artifact, Collection<String> excludes,
-      boolean addSparkContext) throws Exception {
-    if (StringUtils.isBlank(artifact)) {
-      // Should throw here
-      throw new RuntimeException("Invalid artifact to load");
-    }
-
-    // <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
-    int numSplits = artifact.split(":").length;
-    if (numSplits >= 3 && numSplits <= 6) {
-      return loadFromMvn(artifact, excludes, addSparkContext);
-    } else {
-      loadFromFs(artifact, addSparkContext);
-      LinkedList<String> libs = new LinkedList<>();
-      libs.add(artifact);
-      return libs;
-    }
-  }
-
-  private void loadFromFs(String artifact, boolean addSparkContext) throws Exception {
-    File jarFile = new File(artifact);
-
-    global.new Run();
-
-    if (sc.version().startsWith("1.1")) {
-      updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()});
-    } else {
-      updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()});
-    }
-
-    if (addSparkContext) {
-      sc.addJar(jarFile.getAbsolutePath());
-    }
-  }
-
-  private List<String> loadFromMvn(String artifact, Collection<String> excludes,
-      boolean addSparkContext) throws Exception {
-    List<String> loadedLibs = new LinkedList<>();
-    Collection<String> allExclusions = new LinkedList<>();
-    allExclusions.addAll(excludes);
-    allExclusions.addAll(Arrays.asList(exclusions));
-
-    List<ArtifactResult> listOfArtifact;
-    listOfArtifact = getArtifactsWithDep(artifact, allExclusions);
-
-    Iterator<ArtifactResult> it = listOfArtifact.iterator();
-    while (it.hasNext()) {
-      Artifact a = it.next().getArtifact();
-      String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion();
-      for (String exclude : allExclusions) {
-        if (gav.startsWith(exclude)) {
-          it.remove();
-          break;
-        }
-      }
-    }
-
-    List<URL> newClassPathList = new LinkedList<>();
-    List<File> files = new LinkedList<>();
-    for (ArtifactResult artifactResult : listOfArtifact) {
-      logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":"
-          + artifactResult.getArtifact().getArtifactId() + ":"
-          + artifactResult.getArtifact().getVersion());
-      newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL());
-      files.add(artifactResult.getArtifact().getFile());
-      loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":"
-          + artifactResult.getArtifact().getArtifactId() + ":"
-          + artifactResult.getArtifact().getVersion());
-    }
-
-    global.new Run();
-    if (sc.version().startsWith("1.1")) {
-      updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0]));
-    } else {
-      updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0]));
-    }
-    updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
-
-    if (addSparkContext) {
-      for (File f : files) {
-        sc.addJar(f.getAbsolutePath());
-      }
-    }
-
-    return loadedLibs;
-  }
-
-  /**
-   * @param dependency
-   * @param excludes list of pattern can either be of the form groupId:artifactId
-   * @return
-   * @throws Exception
-   */
-  @Override
-  public List<ArtifactResult> getArtifactsWithDep(String dependency,
-      Collection<String> excludes) throws Exception {
-    Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency));
-    DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE);
-    PatternExclusionsDependencyFilter exclusionFilter =
-        new PatternExclusionsDependencyFilter(inferScalaVersion(excludes));
-
-    CollectRequest collectRequest = new CollectRequest();
-    collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE));
-
-    synchronized (repos) {
-      for (RemoteRepository repo : repos) {
-        collectRequest.addRepository(repo);
-      }
-    }
-    DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
-        DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter));
-    return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
-  }
-
-  public static Collection<String> inferScalaVersion(Collection<String> artifact) {
-    List<String> list = new LinkedList<>();
-    for (String a : artifact) {
-      list.add(inferScalaVersion(a));
-    }
-    return list;
-  }
-
-  public static String inferScalaVersion(String artifact) {
-    int pos = artifact.indexOf(":");
-    if (pos < 0 || pos + 2 >= artifact.length()) {
-      // failed to infer
-      return artifact;
-    }
-
-    if (':' == artifact.charAt(pos + 1)) {
-      String restOfthem = "";
-      String versionSep = ":";
-
-      String groupId = artifact.substring(0, pos);
-      int nextPos = artifact.indexOf(":", pos + 2);
-      if (nextPos < 0) {
-        if (artifact.charAt(artifact.length() - 1) == '*') {
-          nextPos = artifact.length() - 1;
-          versionSep = "";
-          restOfthem = "*";
-        } else {
-          versionSep = "";
-          nextPos = artifact.length();
-        }
-      }
-
-      String artifactId = artifact.substring(pos + 2, nextPos);
-      if (nextPos < artifact.length()) {
-        if (!restOfthem.equals("*")) {
-          restOfthem = artifact.substring(nextPos + 1);
-        }
-      }
-
-      String [] version = scala.util.Properties.versionNumberString().split("[.]");
-      String scalaVersion = version[0] + "." + version[1];
-
-      return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + restOfthem;
-    } else {
-      return artifact;
-    }
-  }
-}
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 341beda..e85509c 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -75,13 +75,6 @@
         "description": "Override Spark UI default URL",
         "type": "string"
       },
-      "zeppelin.spark.useNew": {
-        "envName": null,
-        "propertyName": "zeppelin.spark.useNew",
-        "defaultValue": true,
-        "description": "Whether use new spark interpreter implementation",
-        "type": "checkbox"
-      },
       "zeppelin.spark.ui.hidden": {
         "envName": null,
         "propertyName": "zeppelin.spark.ui.hidden",
@@ -168,33 +161,6 @@
   },
   {
     "group": "spark",
-    "name": "dep",
-    "className": "org.apache.zeppelin.spark.DepInterpreter",
-    "properties": {
-      "zeppelin.dep.localrepo": {
-        "envName": "ZEPPELIN_DEP_LOCALREPO",
-        "propertyName": null,
-        "defaultValue": "local-repo",
-        "description": "local repository for dependency loader",
-        "type": "string"
-      },
-      "zeppelin.dep.additionalRemoteRepository": {
-        "envName": null,
-        "propertyName": null,
-        "defaultValue": "spark-packages,http://dl.bintray.com/spark-packages/maven,false;",
-        "description": "A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository.",
-        "type": "textarea"
-      }
-    },
-    "editor": {
-      "language": "scala",
-      "editOnDblClick": false,
-      "completionKey": "TAB",
-      "completionSupport": true
-    }
-  },
-  {
-    "group": "spark",
     "name": "pyspark",
     "className": "org.apache.zeppelin.spark.PySparkInterpreter",
     "properties": {
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
deleted file mode 100644
index 8c73b88..0000000
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-public class DepInterpreterTest {
-
-  @Rule
-  public TemporaryFolder tmpDir = new TemporaryFolder();
-
-  private DepInterpreter dep;
-  private InterpreterContext context;
-
-  private Properties getTestProperties() throws IOException {
-    Properties p = new Properties();
-    p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
-    p.setProperty("zeppelin.dep.additionalRemoteRepository", "spark-packages,https://dl.bintray.com/spark-packages/maven,false;");
-    return p;
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    Properties p = getTestProperties();
-
-    dep = new DepInterpreter(p);
-    dep.open();
-
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(new SparkInterpreter(p));
-    intpGroup.get("note").add(dep);
-    dep.setInterpreterGroup(intpGroup);
-
-    context = InterpreterContext.builder()
-        .build();;
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    dep.close();
-  }
-
-  @Test
-  public void testDefault() throws InterpreterException {
-    dep.getDependencyContext().reset();
-    InterpreterResult ret = dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context);
-    assertEquals(Code.SUCCESS, ret.code());
-
-    assertEquals(1, dep.getDependencyContext().getFiles().size());
-    assertEquals(1, dep.getDependencyContext().getFilesDist().size());
-  }
-}
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index b4abe60..39cf566 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -62,7 +62,6 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     p.setProperty("zeppelin.spark.useHiveContext", "false");
     p.setProperty("zeppelin.spark.maxResult", "3");
     p.setProperty("zeppelin.spark.importImplicit", "true");
-    p.setProperty("zeppelin.spark.useNew", "true");
     p.setProperty("zeppelin.pyspark.python", "python");
     p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
     p.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
deleted file mode 100644
index c2a1bb0..0000000
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.spark.SparkConf;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.junit.AfterClass;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.MethodSorters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class OldSparkInterpreterTest {
-
-  @ClassRule
-  public static TemporaryFolder tmpDir = new TemporaryFolder();
-
-  static SparkInterpreter repl;
-  static InterpreterGroup intpGroup;
-  static InterpreterContext context;
-  static Logger LOGGER = LoggerFactory.getLogger(OldSparkInterpreterTest.class);
-
-  /**
-   * Get spark version number as a numerical value.
-   * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
-   */
-  public static int getSparkVersionNumber(SparkInterpreter repl) {
-    if (repl == null) {
-      return 0;
-    }
-
-    String[] split = repl.getSparkContext().version().split("\\.");
-    int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
-    return version;
-  }
-
-  public static Properties getSparkTestProperties(TemporaryFolder tmpDir) throws IOException {
-    Properties p = new Properties();
-    p.setProperty("master", "local[*]");
-    p.setProperty("spark.app.name", "Zeppelin Test");
-    p.setProperty("zeppelin.spark.useHiveContext", "true");
-    p.setProperty("zeppelin.spark.maxResult", "1000");
-    p.setProperty("zeppelin.spark.importImplicit", "true");
-    p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
-    p.setProperty("zeppelin.spark.property_1", "value_1");
-    // disable color output for easy testing
-    p.setProperty("zeppelin.spark.scala.color", "false");
-    p.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
-    return p;
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    intpGroup = new InterpreterGroup();
-    context = InterpreterContext.builder()
-        .setNoteId("noteId")
-        .setParagraphId("paragraphId")
-        .setParagraphTitle("title")
-        .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
-        .setResourcePool(new LocalResourcePool("id"))
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
-        .build();
-    InterpreterContext.set(context);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    repl = new SparkInterpreter(getSparkTestProperties(tmpDir));
-    repl.setInterpreterGroup(intpGroup);
-    intpGroup.get("note").add(repl);
-    repl.open();
-    // The first para interpretdr will set the Eventclient wrapper
-    //SparkInterpreter.interpret(String, InterpreterContext) ->
-    //SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
-    //ZeppelinContext.setEventClient(RemoteEventClientWrapper)
-    //running a dummy to ensure that we dont have any race conditions among tests
-    repl.interpret("sc", context);
-  }
-
-  @AfterClass
-  public static void tearDown() throws InterpreterException {
-    repl.close();
-  }
-
-  @Test
-  public void testBasicIntp() throws InterpreterException {
-    assertEquals(InterpreterResult.Code.SUCCESS,
-        repl.interpret("val a = 1\nval b = 2", context).code());
-
-    // when interpret incomplete expression
-    InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
-    assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
-    assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error
-                                                   // message
-
-    /*
-     * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b"));
-     * repl.interpret("val ver = sc.version");
-     * assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n",
-     * repl.interpret("println(\"HELLO\")").message());
-     */
-  }
-
-  @Test
-  public void testNonStandardSparkProperties() throws IOException, InterpreterException {
-    // throw NoSuchElementException if no such property is found
-    InterpreterResult result = repl.interpret("sc.getConf.get(\"property_1\")", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-  }
-
-  @Test
-  public void testNextLineInvocation() throws InterpreterException {
-    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
-  }
-
-  @Test
-  public void testNextLineComments() throws InterpreterException {
-    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
-  }
-
-  @Test
-  public void testNextLineCompanionObject() throws InterpreterException {
-    String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}";
-    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code());
-  }
-
-  @Test
-  public void testEndWithComment() throws InterpreterException {
-    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
-  }
-
-  @Test
-  public void testCreateDataFrame() throws InterpreterException {
-    if (getSparkVersionNumber(repl) >= 13) {
-      repl.interpret("case class Person(name:String, age:Int)\n", context);
-      repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
-      repl.interpret("people.toDF.count", context);
-      assertEquals(new Long(4), context.getResourcePool().get(
-          context.getNoteId(),
-          context.getParagraphId(),
-          WellKnownResourceName.ZeppelinReplResult.toString()).get());
-    }
-  }
-
-  @Test
-  public void testZShow() throws InterpreterException {
-    String code = "";
-    repl.interpret("case class Person(name:String, age:Int)\n", context);
-    repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
-    if (getSparkVersionNumber(repl) < 13) {
-      repl.interpret("people.registerTempTable(\"people\")", context);
-      code = "z.show(sqlc.sql(\"select * from people\"))";
-    } else {
-      code = "z.show(people.toDF)";
-    }
-      assertEquals(Code.SUCCESS, repl.interpret(code, context).code());
-  }
-
-  @Test
-  public void testSparkSql() throws IOException, InterpreterException {
-    repl.interpret("case class Person(name:String, age:Int)\n", context);
-    repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
-    assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
-
-
-    if (getSparkVersionNumber(repl) <= 11) { // spark 1.2 or later does not allow create multiple
-      // SparkContext in the same jvm by default.
-      // create new interpreter
-      SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
-      repl2.setInterpreterGroup(intpGroup);
-      intpGroup.get("note").add(repl2);
-      repl2.open();
-
-      repl2.interpret("case class Man(name:String, age:Int)", context);
-      repl2.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
-      assertEquals(Code.SUCCESS, repl2.interpret("man.take(3)", context).code());
-      repl2.close();
-    }
-  }
-
-  @Test
-  public void testReferencingUndefinedVal() throws InterpreterException {
-    InterpreterResult result = repl.interpret("def category(min: Int) = {"
-        + "    if (0 <= value) \"error\"" + "}", context);
-    assertEquals(Code.ERROR, result.code());
-  }
-
-  @Test
-  public void emptyConfigurationVariablesOnlyForNonSparkProperties() {
-    Properties intpProperty = repl.getProperties();
-    SparkConf sparkConf = repl.getSparkContext().getConf();
-    for (Object oKey : intpProperty.keySet()) {
-      String key = (String) oKey;
-      String value = (String) intpProperty.get(key);
-      LOGGER.debug(String.format("[%s]: [%s]", key, value));
-      if (key.startsWith("spark.") && value.isEmpty()) {
-        assertTrue(String.format("configuration starting from 'spark.' should not be empty. [%s]", key), !sparkConf.contains(key) || !sparkConf.get(key).isEmpty());
-      }
-    }
-  }
-
-  @Test
-  public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException {
-    // create another SparkInterpreter
-    SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
-    repl2.setInterpreterGroup(intpGroup);
-    intpGroup.get("note").add(repl2);
-    repl2.open();
-
-    assertEquals(Code.SUCCESS,
-        repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
-    assertEquals(Code.SUCCESS,
-        repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
-
-    repl2.close();
-  }
-
-  @Test
-  public void testEnableImplicitImport() throws IOException, InterpreterException {
-    if (getSparkVersionNumber(repl) >= 13) {
-      // Set option of importing implicits to "true", and initialize new Spark repl
-      Properties p = getSparkTestProperties(tmpDir);
-      p.setProperty("zeppelin.spark.importImplicit", "true");
-      SparkInterpreter repl2 = new SparkInterpreter(p);
-      repl2.setInterpreterGroup(intpGroup);
-      intpGroup.get("note").add(repl2);
-
-      repl2.open();
-      String ddl = "val df = Seq((1, true), (2, false)).toDF(\"num\", \"bool\")";
-      assertEquals(Code.SUCCESS, repl2.interpret(ddl, context).code());
-      repl2.close();
-    }
-  }
-
-  @Test
-  public void testDisableImplicitImport() throws IOException, InterpreterException {
-    if (getSparkVersionNumber(repl) >= 13) {
-      // Set option of importing implicits to "false", and initialize new Spark repl
-      // this test should return error status when creating DataFrame from sequence
-      Properties p = getSparkTestProperties(tmpDir);
-      p.setProperty("zeppelin.spark.importImplicit", "false");
-      SparkInterpreter repl2 = new SparkInterpreter(p);
-      repl2.setInterpreterGroup(intpGroup);
-      intpGroup.get("note").add(repl2);
-
-      repl2.open();
-      String ddl = "val df = Seq((1, true), (2, false)).toDF(\"num\", \"bool\")";
-      assertEquals(Code.ERROR, repl2.interpret(ddl, context).code());
-      repl2.close();
-    }
-  }
-
-  @Test
-  public void testCompletion() throws InterpreterException {
-    List<InterpreterCompletion> completions = repl.completion("sc.", "sc.".length(), null);
-    assertTrue(completions.size() > 0);
-  }
-
-  @Test
-  public void testMultilineCompletion() throws InterpreterException {
-    String buf = "val x = 1\nsc.";
-	List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
-    assertTrue(completions.size() > 0);
-  }
-
-  @Test
-  public void testMultilineCompletionNewVar() throws InterpreterException {
-    Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10());
-    Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7());
-    String buf = "val x = sc\nx.";
-	  List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
-    assertTrue(completions.size() > 0);
-  }
-
-}
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
deleted file mode 100644
index 425651c..0000000
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.LinkedList;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class OldSparkSqlInterpreterTest {
-
-  @ClassRule
-  public static TemporaryFolder tmpDir = new TemporaryFolder();
-
-  static SparkSqlInterpreter sql;
-  static SparkInterpreter repl;
-  static InterpreterContext context;
-  static InterpreterGroup intpGroup;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    Properties p = new Properties();
-    p.putAll(OldSparkInterpreterTest.getSparkTestProperties(tmpDir));
-    p.setProperty("zeppelin.spark.maxResult", "10");
-    p.setProperty("zeppelin.spark.concurrentSQL", "false");
-    p.setProperty("zeppelin.spark.sql.stacktrace", "false");
-    p.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
-
-    repl = new SparkInterpreter(p);
-    intpGroup = new InterpreterGroup();
-    repl.setInterpreterGroup(intpGroup);
-    repl.open();
-    OldSparkInterpreterTest.repl = repl;
-    OldSparkInterpreterTest.intpGroup = intpGroup;
-
-    sql = new SparkSqlInterpreter(p);
-
-    intpGroup = new InterpreterGroup();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(repl);
-    intpGroup.get("note").add(sql);
-    sql.setInterpreterGroup(intpGroup);
-    sql.open();
-
-    context = InterpreterContext.builder()
-        .setNoteId("noteId")
-        .setParagraphId("paragraphId")
-        .setParagraphTitle("title")
-        .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
-        .setResourcePool(new LocalResourcePool("id"))
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
-        .build();
-  }
-
-  @AfterClass
-  public static void tearDown() throws InterpreterException {
-    sql.close();
-    repl.close();
-  }
-
-  boolean isDataFrameSupported() {
-    return OldSparkInterpreterTest.getSparkVersionNumber(repl) >= 13;
-  }
-
-  @Test
-  public void test() throws InterpreterException {
-    repl.interpret("case class Test(name:String, age:Int)", context);
-    repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
-    if (isDataFrameSupported()) {
-      repl.interpret("test.toDF.registerTempTable(\"test\")", context);
-    } else {
-      repl.interpret("test.registerTempTable(\"test\")", context);
-    }
-
-    InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(Type.TABLE, ret.message().get(0).getType());
-    assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
-
-    ret = sql.interpret("select wrong syntax", context);
-    assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(ret.message().get(0).getData().length() > 0);
-
-    assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
-  }
-
-  @Test
-  public void testStruct() throws InterpreterException {
-    repl.interpret("case class Person(name:String, age:Int)", context);
-    repl.interpret("case class People(group:String, person:Person)", context);
-    repl.interpret(
-        "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
-        context);
-    if (isDataFrameSupported()) {
-      repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
-    } else {
-      repl.interpret("gr.registerTempTable(\"gr\")", context);
-    }
-
-    InterpreterResult ret = sql.interpret("select * from gr", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-  }
-
-  @Test
-  public void test_null_value_in_row() throws InterpreterException {
-    repl.interpret("import org.apache.spark.sql._", context);
-    if (isDataFrameSupported()) {
-      repl.interpret(
-          "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
-          context);
-    }
-    repl.interpret(
-        "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
-        context);
-    repl.interpret(
-        "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
-        context);
-    repl.interpret(
-        "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
-        context);
-    repl.interpret(
-        "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
-        context);
-    if (isDataFrameSupported()) {
-      repl.interpret("val people = sqlContext.createDataFrame(raw, schema)",
-          context);
-      repl.interpret("people.toDF.registerTempTable(\"people\")", context);
-    } else {
-      repl.interpret("val people = sqlContext.applySchema(raw, schema)",
-          context);
-      repl.interpret("people.registerTempTable(\"people\")", context);
-    }
-
-    InterpreterResult ret = sql.interpret(
-        "select name, age from people where name = 'gates'", context);
-    System.err.println("RET=" + ret.message());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(Type.TABLE, ret.message().get(0).getType());
-    assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
-  }
-
-  @Test
-  public void testMaxResults() throws InterpreterException {
-    repl.interpret("case class P(age:Int)", context);
-    repl.interpret(
-        "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",
-        context);
-    if (isDataFrameSupported()) {
-      repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
-    } else {
-      repl.interpret("gr.registerTempTable(\"gr\")", context);
-    }
-
-    InterpreterResult ret = sql.interpret("select * from gr", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertTrue(ret.message().get(1).getData().contains("alert-warning"));
-  }
-}
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 2e1567d..2445cce 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -27,7 +27,6 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
-import org.apache.zeppelin.python.PythonInterpreter;
 import org.apache.zeppelin.python.PythonInterpreterTest;
 import org.junit.Test;
 
@@ -55,7 +54,6 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
     properties.setProperty("zeppelin.pyspark.python", "python");
     properties.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
     properties.setProperty("zeppelin.pyspark.useIPython", "false");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.spark.test", "true");
     properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
@@ -107,7 +105,6 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
     intpGroup = new InterpreterGroup();
 
     Properties properties = new Properties();
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("spark.app.name", "Zeppelin Test");
     properties.setProperty("spark.pyspark.python", "invalid_python");
     properties.setProperty("zeppelin.python.useIPython", "false");
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
similarity index 89%
rename from spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
rename to spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 7faae2c..34b24c7 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -38,12 +38,7 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.net.URL;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -54,14 +49,12 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 
 
-public class NewSparkInterpreterTest {
+public class SparkInterpreterTest {
 
   private SparkInterpreter interpreter;
-  private DepInterpreter depInterpreter;
 
   // catch the streaming output in onAppend
   private volatile String output = "";
@@ -82,7 +75,6 @@ public class NewSparkInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
@@ -96,7 +88,6 @@ public class NewSparkInterpreterTest {
     InterpreterContext.set(context);
 
     interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     interpreter.open();
 
@@ -378,39 +369,6 @@ public class NewSparkInterpreterTest {
     interpretThread.join();
   }
 
-  //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed.
-  @Ignore
-  public void testDepInterpreter() throws InterpreterException {
-    Properties properties = new Properties();
-    properties.setProperty("spark.master", "local");
-    properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
-    properties.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
-    // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
-
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    interpreter = new SparkInterpreter(properties);
-    depInterpreter = new DepInterpreter(properties);
-    interpreter.setInterpreterGroup(intpGroup);
-    depInterpreter.setInterpreterGroup(intpGroup);
-    intpGroup.put("session_1", new ArrayList<Interpreter>());
-    intpGroup.get("session_1").add(interpreter);
-    intpGroup.get("session_1").add(depInterpreter);
-
-    depInterpreter.open();
-    InterpreterResult result =
-        depInterpreter.interpret("z.load(\"com.databricks:spark-avro_2.11:3.2.0\")", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-
-    interpreter.open();
-    result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-  }
-
   @Test
   public void testDisableReplOutput() throws InterpreterException {
     Properties properties = new Properties();
@@ -418,7 +376,6 @@ public class NewSparkInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.spark.printREPLOutput", "false");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
@@ -426,7 +383,6 @@ public class NewSparkInterpreterTest {
 
     InterpreterContext.set(getInterpreterContext());
     interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     interpreter.open();
 
@@ -448,14 +404,12 @@ public class NewSparkInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("spark.scheduler.mode", "FAIR");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
     interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     InterpreterContext.set(getInterpreterContext());
     interpreter.open();
@@ -480,14 +434,12 @@ public class NewSparkInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("spark.ui.enabled", "false");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
     interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     InterpreterContext.set(getInterpreterContext());
     interpreter.open();
@@ -508,14 +460,12 @@ public class NewSparkInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.spark.ui.hidden", "true");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
     interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     InterpreterContext.set(getInterpreterContext());
     interpreter.open();
@@ -535,7 +485,6 @@ public class NewSparkInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     // disable color output for easy testing
     properties.setProperty("zeppelin.spark.scala.color", "false");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
@@ -575,9 +524,6 @@ public class NewSparkInterpreterTest {
     if (this.interpreter != null) {
       this.interpreter.close();
     }
-    if (this.depInterpreter != null) {
-      this.depInterpreter.close();
-    }
     SparkShims.reset();
   }
 
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index ae48cbb..3838053 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -53,7 +53,6 @@ public class SparkRInterpreterTest {
     properties.setProperty("spark.app.name", "test");
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.R.knitr", "true");
     properties.setProperty("spark.r.backendConnectionTimeout", "10");
     properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
similarity index 99%
rename from spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
rename to spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index ac5c9e1..cab5b1b 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class NewSparkSqlInterpreterTest {
+public class SparkSqlInterpreterTest {
 
   private static SparkSqlInterpreter sqlInterpreter;
   private static SparkInterpreter sparkInterpreter;
@@ -53,7 +53,6 @@ public class NewSparkSqlInterpreterTest {
     p.setProperty("zeppelin.spark.maxResult", "10");
     p.setProperty("zeppelin.spark.concurrentSQL", "true");
     p.setProperty("zeppelin.spark.sql.stacktrace", "true");
-    p.setProperty("zeppelin.spark.useNew", "true");
     p.setProperty("zeppelin.spark.useHiveContext", "true");
     p.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index defd5ab..321b94f 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -583,48 +583,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
       }
     }
   }
-
-  @Test
-  public void pySparkDepLoaderTest() throws IOException {
-    Note note = null;
-    try {
-      note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
-
-      // restart spark interpreter to make dep loader work
-      TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().close();
-
-      // load dep
-      Paragraph p0 = note.addNewParagraph(anonymous);
-      p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")");
-      note.run(p0.getId(), true);
-      assertEquals(Status.FINISHED, p0.getStatus());
-
-      // write test csv file
-      File tmpFile = File.createTempFile("test", "csv");
-      FileUtils.write(tmpFile, "a,b\n1,2");
-
-      // load data using libraries from dep loader
-      Paragraph p1 = note.addNewParagraph(anonymous);
-
-      String sqlContextName = "sqlContext";
-      if (isSpark2()) {
-        sqlContextName = "spark";
-      }
-      p1.setText("%pyspark\n" +
-          "from pyspark.sql import SQLContext\n" +
-          "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" +
-          ".load('file://" + tmpFile.getAbsolutePath() + "').count())");
-      note.run(p1.getId(), true);
-
-      assertEquals(Status.FINISHED, p1.getStatus());
-      assertEquals("2\n", p1.getReturn().message().get(0).getData());
-    } finally {
-      if (null != note) {
-        TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous);
-      }
-    }
-  }
-
+  
   private void verifySparkVersionNumber() throws IOException {
     Note note = null;
     try {


Mime
View raw message