zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prabhjyotsi...@apache.org
Subject zeppelin git commit: ZEPPELIN-1786. Refactor LivyHelper
Date Thu, 22 Dec 2016 16:44:43 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 7f15b7b16 -> 125a42ca1


ZEPPELIN-1786. Refactor LivyHelper

### What is this PR for?

This PR continue the work of livy refactoring. Here's the main changes in this PR
* Move the code from `LivyHelper` to `BaseLivyInterprereter`
* Define POJO for livy request and response instead of using Map (sometimes nested Map)
* Move the livy session initialization from `interpret` to `open`
* Add one more complicated test which use the spark basic tutorial note.
* Support livy.sql for spark2
* Use zeppelin.livy.create.session.timeout instead of retry count as retry count is internal implementation and user don't know what does it mean.
* Improve travis (wrap livy related work in `setupLivy.sh`)

### What type of PR is it?
[Improvement | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1786

### How should this be tested?

One more integration test is added

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #1751 from zjffdu/ZEPPELIN-1786 and squashes the following commits:

30443f8 [Jeff Zhang] Fix string escape issue for livy.sql
d6fb35d [Jeff Zhang] address comments
6e5cbb8 [Jeff Zhang] ZEPPELIN-1786. Refactor LivyHelper


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/125a42ca
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/125a42ca
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/125a42ca

Branch: refs/heads/master
Commit: 125a42ca1f8194c4b3872230e58cbdb6ff35597d
Parents: 7f15b7b
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Tue Dec 20 12:29:52 2016 +0800
Committer: Prabhjyot Singh <prabhjyotsingh@gmail.com>
Committed: Thu Dec 22 22:14:36 2016 +0530

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 .../zeppelin/livy/BaseLivyInterprereter.java    | 415 ++++++++++++++++---
 .../org/apache/zeppelin/livy/LivyException.java |  43 ++
 .../org/apache/zeppelin/livy/LivyHelper.java    | 406 ------------------
 .../apache/zeppelin/livy/LivyOutputStream.java  |  84 ----
 .../zeppelin/livy/LivySparkSQLInterpreter.java  |  99 +++--
 .../zeppelin/livy/SessionNotFoundException.java |  28 ++
 .../src/main/resources/interpreter-setting.json |   9 +-
 .../apache/zeppelin/livy/LivyHelperTest.java    | 111 -----
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 164 +++++---
 livy/src/test/resources/livy_tutorial_1.scala   |  24 ++
 livy/src/test/resources/livy_tutorial_2.sql     |   5 +
 testing/setupLivy.sh                            |  28 ++
 13 files changed, 662 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index acad5c9..f3e0088 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -90,10 +90,8 @@ install:
 
 before_script:
   - travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
-  - if [[ -n $LIVY_VER ]]; then travis_retry ./testing/downloadLivy.sh $LIVY_VER; fi
+  - ./testing/setupLivy.sh
   - echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
-  - if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
-  - if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
   - tail conf/zeppelin-env.sh
 
 script:

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index d0167c2..a8e3127 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -17,6 +17,9 @@
 
 package org.apache.zeppelin.livy;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -24,7 +27,17 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.kerberos.client.KerberosRestTemplate;
+import org.springframework.web.client.HttpClientErrorException;
+import org.springframework.web.client.RestTemplate;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -33,76 +46,83 @@ import java.util.Properties;
 public abstract class BaseLivyInterprereter extends Interpreter {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
+  private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
 
-  // -1 means session is not created yet, valid sessionId start from 0
-  protected int sessionId = -1;
-  protected String appId;
-  protected String webUIAddress;
+  protected SessionInfo sessionInfo;
+  private String livyURL;
+  private long sessionCreationTimeout;
   protected boolean displayAppInfo;
-  protected LivyOutputStream out;
-  protected LivyHelper livyHelper;
 
   public BaseLivyInterprereter(Properties property) {
     super(property);
-    this.out = new LivyOutputStream();
-    this.livyHelper = new LivyHelper(property);
+    this.livyURL = property.getProperty("zeppelin.livy.url");
+    this.sessionCreationTimeout = Long.parseLong(
+        property.getProperty("zeppelin.livy.create.session.timeout", 120 + ""));
   }
 
   public abstract String getSessionKind();
 
   @Override
   public void open() {
-    // TODO(zjffdu) move session creation here.
+    try {
+      initLivySession();
+    } catch (LivyException e) {
+      String msg = "Fail to create session, please check livy interpreter log and " +
+          "livy server log";
+      LOGGER.error(msg);
+      throw new RuntimeException(msg, e);
+    }
   }
 
   @Override
   public void close() {
-    if (sessionId != -1) {
-      livyHelper.closeSession(sessionId);
-      // reset sessionId to -1
-      sessionId = -1;
+    if (sessionInfo != null) {
+      closeSession(sessionInfo.id);
+      // reset sessionInfo to null so that we won't close it twice.
+      sessionInfo = null;
     }
   }
 
-  protected void createSession(InterpreterContext context) throws Exception {
-    sessionId = livyHelper.createSession(context, getSessionKind());
+  protected void initLivySession() throws LivyException {
+    this.sessionInfo = createSession(getUserName(), getSessionKind());
     if (displayAppInfo) {
-      this.appId = extractStatementResult(
-          livyHelper.interpret("sc.applicationId", context, sessionId).message().get(0).getData());
-      livyHelper.interpret(
-          "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
-          context, sessionId);
-      this.webUIAddress = extractStatementResult(
-          livyHelper.interpret(
-              "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
-              context, sessionId).message().get(0).getData());
-      LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
-          sessionId, appId, webUIAddress);
+      if (sessionInfo.appId == null) {
+        // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
+        // explicitly by ourselves.
+        sessionInfo.appId = extractStatementResult(
+            interpret("sc.applicationId", false).message()
+                .get(0).getData());
+      }
+
+      interpret(
+          "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false);
+      if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
+        sessionInfo.webUIAddress = extractStatementResult(
+            interpret(
+                "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false)
+                .message().get(0).getData());
+      } else {
+        sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
+      }
+      LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
+          sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
     }
   }
 
+  public SessionInfo getSessionInfo() {
+    return sessionInfo;
+  }
+
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
-    try {
-      // add synchronized, because LivySparkSQLInterperter will use ParallelScheduler
-      synchronized (this) {
-        if (sessionId == -1) {
-          try {
-            createSession(context);
-          } catch (Exception e) {
-            LOGGER.error("Exception while creating livy session", e);
-            return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
-          }
-        }
-      }
-      if (StringUtils.isEmpty(st)) {
-        return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
-      }
+    if (StringUtils.isEmpty(st)) {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+    }
 
-      return livyHelper.interpretInput(st, context, sessionId, out,
-          appId, webUIAddress, displayAppInfo);
-    } catch (Exception e) {
-      LOGGER.error("Exception in LivyInterpreter.", e);
+    try {
+      return interpret(st, this.displayAppInfo);
+    } catch (LivyException e) {
+      LOGGER.error("Fail to interpret:" + st, e);
       return new InterpreterResult(InterpreterResult.Code.ERROR,
           InterpreterUtils.getMostRelevantMessage(e));
     }
@@ -116,7 +136,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
    * @param result
    * @return
    */
-  private static String extractStatementResult(String result) {
+  private String extractStatementResult(String result) {
     int pos = -1;
     if ((pos = result.indexOf("=")) >= 0) {
       return result.substring(pos + 1).trim();
@@ -128,7 +148,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
 
   @Override
   public void cancel(InterpreterContext context) {
-    livyHelper.cancelHTTP(context.getParagraphId());
+    //TODO(zjffdu). Use livy cancel api which is available in livy 0.3
   }
 
   @Override
@@ -140,4 +160,305 @@ public abstract class BaseLivyInterprereter extends Interpreter {
   public int getProgress(InterpreterContext context) {
     return 0;
   }
+
+  private SessionInfo createSession(String user, String kind)
+      throws LivyException {
+    try {
+      Map<String, String> conf = new HashMap<>();
+      for (Map.Entry<Object, Object> entry : property.entrySet()) {
+        if (entry.getKey().toString().startsWith("livy.spark.") &&
+            !entry.getValue().toString().isEmpty())
+          conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
+      }
+
+      CreateSessionRequest request = new CreateSessionRequest(kind, user, conf);
+      SessionInfo sessionInfo = SessionInfo.fromJson(
+          callRestAPI("/sessions", "POST", request.toJson()));
+      long start = System.currentTimeMillis();
+      // pull the session status until it is idle or timeout
+      while (!sessionInfo.isReady()) {
+        LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
+            sessionInfo.appId);
+        if (sessionInfo.isFinished()) {
+          String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
+              + ", log: " + sessionInfo.log;
+          LOGGER.error(msg);
+          throw new LivyException(msg);
+        }
+        if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
+          String msg = "The creation of session " + sessionInfo.id + " is timeout within "
+              + sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
+              + ", log: " + sessionInfo.log;
+          LOGGER.error(msg);
+          throw new LivyException(msg);
+        }
+        Thread.sleep(1000);
+        sessionInfo = getSessionInfo(sessionInfo.id);
+      }
+      return sessionInfo;
+    } catch (Exception e) {
+      LOGGER.error("Error when creating livy session for user " + user, e);
+      throw new LivyException(e);
+    }
+  }
+
+  private SessionInfo getSessionInfo(int sessionId) throws LivyException {
+    return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
+  }
+
+  public InterpreterResult interpret(String code, boolean displayAppInfo)
+      throws LivyException {
+    StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code));
+    // pull the statement status
+    while (!stmtInfo.isAvailable()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOGGER.error("InterruptedException when pulling statement status.", e);
+        throw new LivyException(e);
+      }
+      stmtInfo = getStatementInfo(stmtInfo.id);
+    }
+    return getResultFromStatementInfo(stmtInfo, displayAppInfo);
+  }
+
+  private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
+                                                       boolean displayAppInfo) {
+    if (stmtInfo.output.isError()) {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, stmtInfo.output.evalue);
+    } else {
+      //TODO(zjffdu) support other types of data (like json, image and etc)
+      String result = stmtInfo.output.data.plain_text;
+      if (result != null) {
+        result = result.trim();
+        if (result.startsWith("<link")
+            || result.startsWith("<script")
+            || result.startsWith("<style")
+            || result.startsWith("<div")) {
+          result = "%html " + result;
+        }
+      }
+      if (displayAppInfo) {
+        //TODO(zjffdu), use multiple InterpreterResult to display appInfo
+        StringBuilder outputBuilder = new StringBuilder();
+        outputBuilder.append("%angular ");
+        outputBuilder.append("<pre><code>");
+        outputBuilder.append(result);
+        outputBuilder.append("</code></pre>");
+        outputBuilder.append("<hr/>");
+        outputBuilder.append("Spark Application Id:" + sessionInfo.appId + "<br/>");
+        outputBuilder.append("Spark WebUI: <a href=" + sessionInfo.webUIAddress + ">"
+            + sessionInfo.webUIAddress + "</a>");
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputBuilder.toString());
+      } else {
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS, result);
+      }
+    }
+  }
+
+  private StatementInfo executeStatement(ExecuteRequest executeRequest)
+      throws LivyException {
+    return StatementInfo.fromJson(callRestAPI("/sessions/" + sessionInfo.id + "/statements", "POST",
+        executeRequest.toJson()));
+  }
+
+  private StatementInfo getStatementInfo(int statementId)
+      throws LivyException {
+    return StatementInfo.fromJson(
+        callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
+  }
+
+  private RestTemplate getRestTemplate() {
+    String keytabLocation = property.getProperty("zeppelin.livy.keytab");
+    String principal = property.getProperty("zeppelin.livy.principal");
+    if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
+      return new KerberosRestTemplate(keytabLocation, principal);
+    }
+    return new RestTemplate();
+  }
+
+  private String callRestAPI(String targetURL, String method) throws LivyException {
+    return callRestAPI(targetURL, method, "");
+  }
+
+  private String callRestAPI(String targetURL, String method, String jsonData)
+      throws LivyException {
+    targetURL = livyURL + targetURL;
+    LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
+    RestTemplate restTemplate = getRestTemplate();
+    HttpHeaders headers = new HttpHeaders();
+    headers.add("Content-Type", "application/json");
+    headers.add("X-Requested-By", "zeppelin");
+    ResponseEntity<String> response = null;
+    try {
+      if (method.equals("POST")) {
+        HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
+        response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
+      } else if (method.equals("GET")) {
+        HttpEntity<String> entity = new HttpEntity<>(headers);
+        response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
+      } else if (method.equals("DELETE")) {
+        HttpEntity<String> entity = new HttpEntity<>(headers);
+        response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
+      }
+    } catch (HttpClientErrorException e) {
+      response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
+      LOGGER.error(String.format("Error with %s StatusCode: %s",
+          response.getStatusCode().value(), e.getResponseBodyAsString()));
+    }
+    if (response == null) {
+      throw new LivyException("No http response returned");
+    }
+    LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
+        response.getBody());
+    if (response.getStatusCode().value() == 200
+        || response.getStatusCode().value() == 201
+        || response.getStatusCode().value() == 404) {
+      String responseBody = response.getBody();
+      if (responseBody.matches("Session '\\d+' not found.")) {
+        throw new SessionNotFoundException(responseBody);
+      } else {
+        return responseBody;
+      }
+    } else {
+      String responseString = response.getBody();
+      if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
+        return responseString;
+      }
+      LOGGER.error(String.format("Error with %s StatusCode: %s",
+          response.getStatusCode().value(), responseString));
+      throw new LivyException(String.format("Error with %s StatusCode: %s",
+          response.getStatusCode().value(), responseString));
+    }
+  }
+
+  private void closeSession(int sessionId) {
+    try {
+      callRestAPI("/sessions/" + sessionId, "DELETE");
+    } catch (Exception e) {
+      LOGGER.error(String.format("Error closing session for user with session ID: %s",
+          sessionId), e);
+    }
+  }
+
+  /*
+  * We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has
+  * some changes from version to version. So we create these POJO in zeppelin side to accommodate
+  * incompatibility between versions. Later, when livy become more stable, we could just depend on
+  * livy client jar.
+  */
+  private static class CreateSessionRequest {
+    public final String kind;
+    @SerializedName("proxyUser")
+    public final String user;
+    public final Map<String, String> conf;
+
+    public CreateSessionRequest(String kind, String user, Map<String, String> conf) {
+      this.kind = kind;
+      this.user = user;
+      this.conf = conf;
+    }
+
+    public String toJson() {
+      return gson.toJson(this);
+    }
+  }
+
+  /**
+   *
+   */
+  public static class SessionInfo {
+
+    public final int id;
+    public String appId;
+    public String webUIAddress;
+    public final String owner;
+    public final String proxyUser;
+    public final String state;
+    public final String kind;
+    public final Map<String, String> appInfo;
+    public final List<String> log;
+
+    public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
+                       String kind, Map<String, String> appInfo, List<String> log) {
+      this.id = id;
+      this.appId = appId;
+      this.owner = owner;
+      this.proxyUser = proxyUser;
+      this.state = state;
+      this.kind = kind;
+      this.appInfo = appInfo;
+      this.log = log;
+    }
+
+    public boolean isReady() {
+      return state.equals("idle");
+    }
+
+    public boolean isFinished() {
+      return state.equals("error") || state.equals("dead") || state.equals("success");
+    }
+
+    public static SessionInfo fromJson(String json) {
+      return gson.fromJson(json, SessionInfo.class);
+    }
+  }
+
+  private static class ExecuteRequest {
+    public final String code;
+
+    public ExecuteRequest(String code) {
+      this.code = code;
+    }
+
+    public String toJson() {
+      return gson.toJson(this);
+    }
+  }
+
+  private static class StatementInfo {
+    public Integer id;
+    public String state;
+    public StatementOutput output;
+
+    public StatementInfo() {
+    }
+
+    public static StatementInfo fromJson(String json) {
+      return gson.fromJson(json, StatementInfo.class);
+    }
+
+    public boolean isAvailable() {
+      return state.equals("available");
+    }
+
+    private static class StatementOutput {
+      public String status;
+      public String execution_count;
+      public Data data;
+      public String ename;
+      public String evalue;
+      public Object traceback;
+
+      public boolean isError() {
+        return status.equals("error");
+      }
+
+      public String toJson() {
+        return gson.toJson(this);
+      }
+
+      private static class Data {
+        @SerializedName("text/plain")
+        public String plain_text;
+        @SerializedName("image/png")
+        public String image_png;
+        @SerializedName("application/json")
+        public String application_json;
+        @SerializedName("application/vnd.livy.table.v1+json")
+        public String application_livy_table_json;
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java
new file mode 100644
index 0000000..5adffd4
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+/**
+ * Livy api related exception
+ */
+public class LivyException extends Exception {
+  public LivyException() {
+  }
+
+  public LivyException(String message) {
+    super(message);
+  }
+
+  public LivyException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public LivyException(Throwable cause) {
+    super(cause);
+  }
+
+  public LivyException(String message, Throwable cause, boolean enableSuppression,
+                       boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
deleted file mode 100644
index 76f5167..0000000
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.livy;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-import org.springframework.security.kerberos.client.KerberosRestTemplate;
-import org.springframework.web.client.HttpClientErrorException;
-import org.springframework.web.client.RestTemplate;
-
-import java.nio.charset.Charset;
-import java.util.*;
-import java.util.Map.Entry;
-
-
-/***
- * Livy helper class
- */
-public class LivyHelper {
-  Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
-  Gson gson = new GsonBuilder().setPrettyPrinting().create();
-  HashMap<String, Object> paragraphHttpMap = new HashMap<>();
-  Properties property;
-
-  LivyHelper(Properties property) {
-    this.property = property;
-  }
-
-  public Integer createSession(InterpreterContext context, String kind) throws Exception {
-    try {
-      Map<String, String> conf = new HashMap<>();
-
-      Iterator<Entry<Object, Object>> it = property.entrySet().iterator();
-      while (it.hasNext()) {
-        Entry<Object, Object> pair = it.next();
-        if (pair.getKey().toString().startsWith("livy.spark.") &&
-            !pair.getValue().toString().isEmpty())
-          conf.put(pair.getKey().toString().substring(5), pair.getValue().toString());
-      }
-
-      String confData = gson.toJson(conf);
-      String user = context.getAuthenticationInfo().getUser();
-
-      String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", "POST",
-          "{" +
-              "\"kind\": \"" + kind + "\", " +
-              "\"conf\": " + confData + ", " +
-              "\"proxyUser\": " + (StringUtils.isEmpty(user) ? null : "\"" + user + "\"") +
-              "}",
-          context.getParagraphId()
-      );
-
-      Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
-          new TypeToken<Map<Object, Object>>() {
-          }.getType());
-      Integer sessionId = ((Double) jsonMap.get("id")).intValue();
-      if (!jsonMap.get("state").equals("idle")) {
-        Integer retryCount = 60;
-
-        try {
-          retryCount = Integer.valueOf(
-              property.getProperty("zeppelin.livy.create.session.retries"));
-        } catch (Exception e) {
-          LOGGER.info("zeppelin.livy.create.session.retries property is not configured." +
-              " Using default retry count.");
-        }
-
-        while (retryCount >= 0) {
-          LOGGER.error(String.format("sessionId:%s state is %s",
-              jsonMap.get("id"), jsonMap.get("state")));
-          Thread.sleep(1000);
-          json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
-              sessionId, "GET", null, context.getParagraphId());
-          jsonMap = (Map<Object, Object>) gson.fromJson(json,
-              new TypeToken<Map<Object, Object>>() {
-              }.getType());
-          if (jsonMap.get("state").equals("idle")) {
-            break;
-          } else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) {
-            json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
-                    sessionId + "/log",
-                "GET", null,
-                context.getParagraphId());
-            jsonMap = (Map<Object, Object>) gson.fromJson(json,
-                new TypeToken<Map<Object, Object>>() {
-                }.getType());
-            String logs = StringUtils.join((ArrayList<String>) jsonMap.get("log"), '\n');
-            LOGGER.error(String.format("Cannot start  %s.\n%s", kind, logs));
-            throw new Exception(String.format("Cannot start  %s.\n%s", kind, logs));
-          }
-          retryCount--;
-        }
-        if (retryCount <= 0) {
-          LOGGER.error("Error getting session for user within the configured number of retries.");
-          throw new Exception(String.format("Cannot start  %s.", kind));
-        }
-      }
-      return sessionId;
-    } catch (Exception e) {
-      LOGGER.error("Error getting session for user", e);
-      throw e;
-    }
-  }
-
-  public InterpreterResult interpretInput(String stringLines,
-                                          final InterpreterContext context,
-                                          int sessionId,
-                                          LivyOutputStream out,
-                                          String appId,
-                                          String webUI,
-                                          boolean displayAppInfo) {
-    try {
-      out.setInterpreterOutput(context.out);
-      context.out.clear();
-      String incomplete = "";
-      boolean inComment = false;
-      String[] lines = stringLines.split("\n");
-      String[] linesToRun = new String[lines.length + 1];
-      for (int i = 0; i < lines.length; i++) {
-        linesToRun[i] = lines[i];
-      }
-      linesToRun[lines.length] = "print(\"\")";
-      Code r = null;
-      StringBuilder outputBuilder = new StringBuilder();
-      for (int l = 0; l < linesToRun.length; l++) {
-        String s = linesToRun[l];
-        // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
-        //for spark
-        if (l + 1 < linesToRun.length) {
-          String nextLine = linesToRun[l + 1].trim();
-          boolean continuation = false;
-          if (nextLine.isEmpty()
-              || nextLine.startsWith("//")         // skip empty line or comment
-              || nextLine.startsWith("}")
-              || nextLine.startsWith("object")) {  // include "} object" for Scala companion object
-            continuation = true;
-          } else if (!inComment && nextLine.startsWith("/*")) {
-            inComment = true;
-            continuation = true;
-          } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
-            inComment = false;
-            continuation = true;
-          } else if (nextLine.length() > 1
-              && nextLine.charAt(0) == '.'
-              && nextLine.charAt(1) != '.'     // ".."
-              && nextLine.charAt(1) != '/') {  // "./"
-            continuation = true;
-          } else if (inComment) {
-            continuation = true;
-          }
-          if (continuation) {
-            incomplete += s + "\n";
-            continue;
-          }
-        }
-
-        InterpreterResult res;
-        try {
-          res = interpret(incomplete + s, context, sessionId);
-        } catch (Exception e) {
-          LOGGER.error("Interpreter exception", e);
-          return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
-        }
-
-        r = res.code();
-
-        if (r == Code.ERROR) {
-          out.setInterpreterOutput(null);
-          return res;
-        } else if (r == Code.INCOMPLETE) {
-          incomplete += s + "\n";
-        } else {
-          outputBuilder.append(res.message() + "\n");
-          incomplete = "";
-        }
-      }
-
-      if (r == Code.INCOMPLETE) {
-        out.setInterpreterOutput(null);
-        return new InterpreterResult(r, "Incomplete expression");
-      } else {
-        if (displayAppInfo) {
-          out.write("%angular ");
-          out.write("<pre><code>");
-          out.write(outputBuilder.toString());
-          out.write("</code></pre>");
-          out.write("<hr/>");
-          out.write("Spark Application Id:" + appId + "<br/>");
-          out.write("Spark WebUI: <a href=" + webUI + ">" + webUI + "</a>");
-        } else {
-          out.write(outputBuilder.toString());
-        }
-        out.setInterpreterOutput(null);
-        return new InterpreterResult(Code.SUCCESS);
-      }
-    } catch (Exception e) {
-      LOGGER.error("error in interpretInput", e);
-      return new InterpreterResult(Code.ERROR, e.getMessage());
-    }
-  }
-
-  public InterpreterResult interpret(String stringLines,
-                                     final InterpreterContext context,
-                                     int sessionId)
-      throws Exception {
-    if (stringLines.trim().equals("")) {
-      return new InterpreterResult(Code.SUCCESS, "");
-    }
-    Map jsonMap = executeCommand(stringLines, context, sessionId);
-    Integer id = ((Double) jsonMap.get("id")).intValue();
-    InterpreterResult res = getResultFromMap(jsonMap);
-    if (res != null) {
-      return res;
-    }
-
-    while (true) {
-      Thread.sleep(1000);
-      if (paragraphHttpMap.get(context.getParagraphId()) == null) {
-        return new InterpreterResult(Code.INCOMPLETE, "");
-      }
-      jsonMap = getStatusById(context, sessionId, id);
-      InterpreterResult interpreterResult = getResultFromMap(jsonMap);
-      if (interpreterResult != null) {
-        return interpreterResult;
-      }
-    }
-  }
-
-  private InterpreterResult getResultFromMap(Map jsonMap) {
-    if (jsonMap.get("state").equals("available")) {
-      if (((Map) jsonMap.get("output")).get("status").equals("error")) {
-        StringBuilder errorMessage = new StringBuilder((String) ((Map) jsonMap
-            .get("output")).get("evalue"));
-        if (errorMessage.toString().equals("incomplete statement")
-            || errorMessage.toString().contains("EOF")) {
-          return new InterpreterResult(Code.INCOMPLETE, "");
-        }
-        String traceback = gson.toJson(((Map) jsonMap.get("output")).get("traceback"));
-        if (!traceback.equals("[]")) {
-          errorMessage
-              .append("\n")
-              .append("traceback: \n")
-              .append(traceback);
-        }
-
-        return new InterpreterResult(Code.ERROR, errorMessage.toString());
-      }
-      if (((Map) jsonMap.get("output")).get("status").equals("ok")) {
-        String result = (String) ((Map) ((Map) jsonMap.get("output"))
-            .get("data")).get("text/plain");
-        if (result != null) {
-          result = result.trim();
-          if (result.startsWith("<link")
-              || result.startsWith("<script")
-              || result.startsWith("<style")
-              || result.startsWith("<div")) {
-            result = "%html " + result;
-          }
-        }
-        return new InterpreterResult(Code.SUCCESS, result);
-      }
-    }
-    return null;
-  }
-
-  private Map executeCommand(String lines, InterpreterContext context, int sessionId)
-      throws Exception {
-    String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
-            + sessionId + "/statements",
-        "POST",
-        "{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
-        context.getParagraphId());
-    if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
-      throw new Exception("Exception: Session not found, Livy server would have restarted, " +
-          "or lost session.");
-    }
-    try {
-      Map jsonMap = gson.fromJson(json,
-          new TypeToken<Map>() {
-          }.getType());
-      return jsonMap;
-    } catch (Exception e) {
-      LOGGER.error("Error executeCommand", e);
-      throw e;
-    }
-  }
-
-  private Map getStatusById(InterpreterContext context,
-                            int sessionId, Integer id) throws Exception {
-    String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
-            + sessionId
-            + "/statements/" + id,
-        "GET", null, context.getParagraphId());
-    LOGGER.debug("statement {} response: {}", id, json);
-    try {
-      Map jsonMap = gson.fromJson(json,
-          new TypeToken<Map>() {
-          }.getType());
-      return jsonMap;
-    } catch (Exception e) {
-      LOGGER.error("Error getStatusById", e);
-      throw e;
-    }
-  }
-
-  private RestTemplate getRestTemplate() {
-    String keytabLocation = property.getProperty("zeppelin.livy.keytab");
-    String principal = property.getProperty("zeppelin.livy.principal");
-    if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
-      return new KerberosRestTemplate(keytabLocation, principal);
-    }
-    return new RestTemplate();
-  }
-
-  protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
-      throws Exception {
-    LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
-    RestTemplate restTemplate = getRestTemplate();
-    HttpHeaders headers = new HttpHeaders();
-    headers.add("Content-Type", "application/json");
-    headers.add("X-Requested-By", "zeppelin");
-    ResponseEntity<String> response = null;
-    try {
-      if (method.equals("POST")) {
-        HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
-
-        response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
-        paragraphHttpMap.put(paragraphId, response);
-      } else if (method.equals("GET")) {
-        HttpEntity<String> entity = new HttpEntity<>(headers);
-        response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
-        paragraphHttpMap.put(paragraphId, response);
-      } else if (method.equals("DELETE")) {
-        HttpEntity<String> entity = new HttpEntity<>(headers);
-        response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
-      }
-    } catch (HttpClientErrorException e) {
-      response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
-      LOGGER.error(String.format("Error with %s StatusCode: %s",
-          response.getStatusCode().value(), e.getResponseBodyAsString()));
-    }
-    if (response == null) {
-      return null;
-    }
-
-    if (response.getStatusCode().value() == 200
-            || response.getStatusCode().value() == 201
-            || response.getStatusCode().value() == 404) {
-      return response.getBody();
-    } else {
-      String responseString = response.getBody();
-      if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
-        return responseString;
-      }
-      LOGGER.error(String.format("Error with %s StatusCode: %s",
-              response.getStatusCode().value(), responseString));
-      throw new Exception(String.format("Error with %s StatusCode: %s",
-              response.getStatusCode().value(), responseString));
-    }
-  }
-
-  public void cancelHTTP(String paragraphId) {
-    // TODO(zjffdu), use cancel rest api of livy
-    paragraphHttpMap.put(paragraphId, null);
-  }
-
-  public void closeSession(int sessionId) {
-    try {
-      executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
-          "DELETE", null, null);
-    } catch (Exception e) {
-      LOGGER.error(String.format("Error closing session for user with session ID: %s",
-          sessionId), e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
deleted file mode 100644
index a11634e..0000000
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.livy;
-
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * InterpreterOutput can be attached / detached.
- */
-public class LivyOutputStream extends OutputStream {
-
-  private static Logger LOGGER = LoggerFactory.getLogger(LivyOutputStream.class);
-  InterpreterOutput interpreterOutput;
-
-  public LivyOutputStream() {
-  }
-
-  public InterpreterOutput getInterpreterOutput() {
-    return interpreterOutput;
-  }
-
-  public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
-    this.interpreterOutput = interpreterOutput;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    if (interpreterOutput != null) {
-      interpreterOutput.write(b);
-    }
-  }
-
-  @Override
-  public void write(byte[] b) throws IOException {
-    if (interpreterOutput != null) {
-      interpreterOutput.write(b);
-    }
-  }
-
-  public void write(String text) throws IOException {
-    LOGGER.debug("livy output:" + text);
-    write(text.getBytes("UTF-8"));
-  }
-
-  @Override
-  public void write(byte[] b, int offset, int len) throws IOException {
-    if (interpreterOutput != null) {
-      interpreterOutput.write(b, offset, len);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (interpreterOutput != null) {
-      interpreterOutput.close();
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    if (interpreterOutput != null) {
-      interpreterOutput.flush();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 089b264..cdc8b5b 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -19,16 +19,10 @@ package org.apache.zeppelin.livy;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -38,10 +32,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
 
   private LivySparkInterpreter sparkInterpreter;
 
-  private boolean sqlContextCreated = false;
+  private boolean isSpark2 = false;
+  private int maxResult = 1000;
 
   public LivySparkSQLInterpreter(Properties property) {
     super(property);
+    this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
   }
 
   @Override
@@ -51,10 +47,56 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
 
   @Override
   public void open() {
-    super.open();
-    this.sparkInterpreter =
-        (LivySparkInterpreter) getInterpreterInTheSameSessionByClassName(
-            LivySparkInterpreter.class.getName());
+    this.sparkInterpreter = getSparkInterpreter();
+    // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
+    // to judge whether it is using spark2.
+    try {
+      InterpreterResult result = sparkInterpreter.interpret("spark", false);
+      if (result.code() == InterpreterResult.Code.SUCCESS &&
+          result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
+        LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
+            sparkInterpreter.getSessionInfo().id);
+        isSpark2 = true;
+      } else {
+        // spark 1.x
+        result = sparkInterpreter.interpret("sqlContext", false);
+        if (result.code() == InterpreterResult.Code.SUCCESS) {
+          LOGGER.info("sqlContext is detected.");
+        } else if (result.code() == InterpreterResult.Code.ERROR) {
+          // create SqlContext if it is not available, as in livy 0.2 sqlContext
+          // is not available.
+          LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
+          result = sparkInterpreter.interpret(
+              "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+                  + "import sqlContext.implicits._", false);
+          if (result.code() == InterpreterResult.Code.ERROR) {
+            throw new LivyException("Fail to create SQLContext," +
+                result.message().get(0).getData());
+          }
+        }
+      }
+    } catch (LivyException e) {
+      throw new RuntimeException("Fail to Detect SparkVersion", e);
+    }
+  }
+
+  private LivySparkInterpreter getSparkInterpreter() {
+    LazyOpenInterpreter lazy = null;
+    LivySparkInterpreter spark = null;
+    Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
+      }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+    spark = (LivySparkInterpreter) p;
+
+    if (lazy != null) {
+      lazy.open();
+    }
+    return spark;
   }
 
   @Override
@@ -64,37 +106,19 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
         return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
       }
 
-      // create sqlContext implicitly if it is not available, as in livy 0.2 sqlContext
-      // is not available.
-      synchronized (this) {
-        if (!sqlContextCreated) {
-          InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
-          if (result.code() == InterpreterResult.Code.ERROR) {
-            result = sparkInterpreter.interpret(
-                "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
-                    + "import sqlContext.implicits._", context);
-            if (result.code() == InterpreterResult.Code.ERROR) {
-              return new InterpreterResult(InterpreterResult.Code.ERROR,
-                  "Fail to create sqlContext," + result.message());
-            }
-          }
-          sqlContextCreated = true;
-        }
+      // use triple quote so that we don't need to do string escape.
+      String sqlQuery = null;
+      if (isSpark2) {
+        sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
+      } else {
+        sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
       }
-
-      // delegate the work to LivySparkInterpreter in the same session.
-      // TODO(zjffdu), we may create multiple session for the same user here. This can be fixed
-      // after we move session creation to open()
-      InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
-          line.replaceAll("\"", "\\\\\"")
-              .replaceAll("\\n", " ")
-          + "\").show(" +
-          property.get("zeppelin.livy.spark.sql.maxResult") + ")", context);
+      InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo);
 
       if (res.code() == InterpreterResult.Code.SUCCESS) {
         StringBuilder resMsg = new StringBuilder();
         resMsg.append("%table ");
-        String[] rows = new String(context.out.toByteArray()).split("\n");
+        String[] rows = res.message().get(0).getData().split("\n");
         String[] headers = rows[1].split("\\|");
         for (int head = 1; head < headers.length; head++) {
           resMsg.append(headers[head].trim()).append("\t");
@@ -121,7 +145,6 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
       } else {
         return res;
       }
-
     } catch (Exception e) {
       LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
       return new InterpreterResult(InterpreterResult.Code.ERROR,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java b/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java
new file mode 100644
index 0000000..4547057
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+/**
+ *
+ */
+public class SessionNotFoundException extends LivyException {
+
+  public SessionNotFoundException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json
index fc74281..8a4e4d7 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -13,14 +13,9 @@
       },
       "zeppelin.livy.create.session.retries": {
         "envName": "ZEPPELIN_LIVY_CREATE_SESSION_RETRIES",
-        "propertyName": "zeppelin.livy.create.session.retries",
+        "propertyName": "zeppelin.livy.create.session.timeout",
         "defaultValue": "120",
-        "description": "Livy Server create session retry count."
-      },
-      "livy.spark.master": {
-        "propertyName": "livy.spark.master",
-        "defaultValue": "local[*]",
-        "description": "Spark master uri. ex) spark://masterhost:7077"
+        "description": "Livy Server create session timeout (seconds)."
       },
       "livy.spark.driver.cores": {
         "propertyName": "livy.spark.driver.cores",

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/test/java/org/apache/zeppelin/livy/LivyHelperTest.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyHelperTest.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyHelperTest.java
deleted file mode 100644
index e4d92e4..0000000
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyHelperTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.livy;
-
-import com.google.gson.GsonBuilder;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.hamcrest.CoreMatchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ErrorCollector;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Properties;
-
-import static org.mockito.Mockito.doReturn;
-
-/**
- * Created for org.apache.zeppelin.livy on 22/04/16.
- */
-
-@RunWith(MockitoJUnitRunner.class)
-public class LivyHelperTest {
-
-  @Rule
-  public ErrorCollector collector = new ErrorCollector();
-
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private static LivyPySparkInterpreter interpreter;
-
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private InterpreterContext interpreterContext;
-
-  @Mock(answer = Answers.CALLS_REAL_METHODS)
-  private LivyHelper livyHelper;
-
-  @Before
-  public void prepareContext() throws Exception {
-    Properties properties = new Properties();
-    properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
-    livyHelper.property = properties;
-    livyHelper.paragraphHttpMap = new HashMap<>();
-    livyHelper.gson = new GsonBuilder().setPrettyPrinting().create();
-    livyHelper.LOGGER = LoggerFactory.getLogger(LivyHelper.class);
-
-    doReturn("{\"id\":1,\"state\":\"idle\",\"kind\":\"spark\",\"proxyUser\":\"null\",\"log\":[]}")
-        .when(livyHelper)
-        .executeHTTP(
-            livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions",
-            "POST",
-            "{\"kind\": \"spark\", \"conf\": {}, \"proxyUser\": null}",
-            null
-        );
-
-    doReturn("{\"id\":1,\"state\":\"available\",\"output\":{\"status\":\"ok\"," +
-        "\"execution_count\":1,\"data\":{\"text/plain\":\"1\"}}}")
-        .when(livyHelper)
-        .executeHTTP(
-            livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions/1/statements",
-            "POST",
-            "{\"code\": \"print(1)\"}",
-            null
-        );
-
-  }
-
-
-  @Test
-  public void checkCreateSession() {
-    try {
-      Integer sessionId = livyHelper.createSession(interpreterContext, "spark");
-
-      collector.checkThat("check sessionId", 1, CoreMatchers.equalTo(sessionId));
-
-    } catch (Exception e) {
-      collector.addError(e);
-    }
-  }
-
-  @Test
-  public void checkInterpret() {
-    try {
-      InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, 1);
-      collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS,
-          CoreMatchers.equalTo(result.code()));
-    } catch (Exception e) {
-      collector.addError(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index e996694..0173f1d 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -20,6 +20,8 @@ package org.apache.zeppelin.livy;
 
 import com.cloudera.livy.test.framework.Cluster;
 import com.cloudera.livy.test.framework.Cluster$;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.*;
@@ -49,13 +51,14 @@ public class LivyInterpreterIT {
     LOGGER.info("Starting livy at {}", cluster.livyEndpoint());
     properties = new Properties();
     properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint());
-    properties.setProperty("zeppelin.livy.create.session.retries", "120");
+    properties.setProperty("zeppelin.livy.create.session.timeout", "120");
     properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100");
   }
 
   @AfterClass
   public static void tearDown() {
     if (cluster != null) {
+      LOGGER.info("Shutting down livy at {}", cluster.livyEndpoint());
       cluster.cleanUp();
     }
   }
@@ -92,63 +95,63 @@ public class LivyInterpreterIT {
     try {
       InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("1.5.2"));
 
       // test RDD api
-      outputListener.reset();
       result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
 
       // single line comment
-      outputListener.reset();
-      String singleLineComment = "// my comment";
+      String singleLineComment = "println(1)// my comment";
       result = sparkInterpreter.interpret(singleLineComment, context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
+      assertEquals(1, result.message().size());
 
       // multiple line comment
-      outputListener.reset();
-      String multipleLineComment = "/* multiple \n" + "line \n" + "comment */";
+      String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */";
       result = sparkInterpreter.interpret(multipleLineComment, context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
+      assertEquals(1, result.message().size());
 
       // multi-line string
-      outputListener.reset();
       String multiLineString = "val str = \"\"\"multiple\n" +
           "line\"\"\"\n" +
           "println(str)";
       result = sparkInterpreter.interpret(multiLineString, context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("multiple\nline"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("multiple\nline"));
 
       // case class
-      outputListener.reset();
       String caseClassCode = "case class Person(id:Int, \n" +
           "name:String)\n" +
           "val p=Person(1, \"name_a\")";
       result = sparkInterpreter.interpret(caseClassCode, context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("defined class Person"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)"));
 
       // object class
-      outputListener.reset();
       String objectClassCode = "object Person {}";
       result = sparkInterpreter.interpret(objectClassCode, context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("defined module Person"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("defined module Person"));
 
       // error
       result = sparkInterpreter.interpret("println(a)", context);
       assertEquals(InterpreterResult.Code.ERROR, result.code());
       assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
       assertTrue(result.message().get(0).getData().contains("error: not found: value a"));
+
+      // incomplete code
+      result = sparkInterpreter.interpret("if(true){", context);
+      assertEquals(InterpreterResult.Code.ERROR, result.code());
+      assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
+      assertTrue(result.message().get(0).getData().contains("incomplete statement"));
     } finally {
       sparkInterpreter.close();
     }
@@ -178,24 +181,46 @@ public class LivyInterpreterIT {
 
     try {
       // test DataFrame api
-      outputListener.reset();
       sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
           + "import sqlContext.implicits._", context);
-      InterpreterResult result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
+      InterpreterResult result = sparkInterpreter.interpret(
+          "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
           + "df.collect()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended()
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData()
           .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
       sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
 
       // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
-      outputListener.reset();
-      result = sqlInterpreter.interpret("select * from df", context);
+      result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+      // TODO(zjffdu), \t at the end of each line is not necessary,
+      // it is a bug of LivySparkSQLInterpreter
+      assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
+      // double quotes
+      result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+      assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
+      // double quotes inside attribute value
+      // TODO(zjffdu). This test case would fail on spark-1.5, would uncomment it when upgrading to
+      // livy-0.3 and spark-1.6
+      // result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context);
+      // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      // assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+
+      // single quotes inside attribute value
+      result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
-      // TODO(zjffdu), \t at the end of each line is not necessary, it is a bug of LivySparkSQLInterpreter
-      assertEquals("_1\t_2\t\nhello\t20\t\n", result.message().get(0).getData());
+
+      // test sql with syntax error
+      result = sqlInterpreter.interpret("select * from df2", context);
+      assertEquals(InterpreterResult.Code.ERROR, result.code());
+      assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
+      assertTrue(result.message().get(0).getData().contains("Table Not Found"));
     } finally {
       sparkInterpreter.close();
       sqlInterpreter.close();
@@ -209,10 +234,12 @@ public class LivyInterpreterIT {
     }
     InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
     interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
+    LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
+        new LivySparkInterpreter(properties));
     sparkInterpreter.setInterpreterGroup(interpreterGroup);
     interpreterGroup.get("session_1").add(sparkInterpreter);
-    LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
+    LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
+        new LivySparkSQLInterpreter(properties));
     interpreterGroup.get("session_1").add(sqlInterpreter);
     sqlInterpreter.setInterpreterGroup(interpreterGroup);
     sqlInterpreter.open();
@@ -249,25 +276,23 @@ public class LivyInterpreterIT {
     try {
       InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("1.5.2"));
 
       // test RDD api
-      outputListener.reset();
       result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("45"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("45"));
 
       // test DataFrame api
-      outputListener.reset();
       pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n"
           + "sqlContext = SQLContext(sc)", context);
       result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
           + "df.collect()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(0, result.message().size());
-      assertTrue(outputListener.getOutputAppended().contains("[Row(_1=u'hello', _2=20)]"));
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]"));
 
       // error
       result = pysparkInterpreter.interpret("print(a)", context);
@@ -287,37 +312,52 @@ public class LivyInterpreterIT {
     // TODO(zjffdu),  Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
   }
 
-  public static class MyInterpreterOutputListener implements InterpreterOutputListener {
-    private StringBuilder outputAppended = new StringBuilder();
-    private StringBuilder outputUpdated = new StringBuilder();
+  @Test
+  public void testLivyTutorialNote() throws IOException {
+    if (!checkPreCondition()) {
+      return;
+    }
+    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+    LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
+        new LivySparkInterpreter(properties));
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+    interpreterGroup.get("session_1").add(sparkInterpreter);
+    LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
+        new LivySparkSQLInterpreter(properties));
+    interpreterGroup.get("session_1").add(sqlInterpreter);
+    sqlInterpreter.setInterpreterGroup(interpreterGroup);
+    sqlInterpreter.open();
 
+    try {
+      AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+      MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+      InterpreterOutput output = new InterpreterOutput(outputListener);
+      InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
+          "title", "text", authInfo, null, null, null, null, null, output);
+
+      String p1 = IOUtils.toString(getClass().getResourceAsStream("/livy_tutorial_1.scala"));
+      InterpreterResult result = sparkInterpreter.interpret(p1, context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+      String p2 = IOUtils.toString(getClass().getResourceAsStream("/livy_tutorial_2.sql"));
+      result = sqlInterpreter.interpret(p2, context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+    } finally {
+      sparkInterpreter.close();
+      sqlInterpreter.close();
+    }
+  }
+
+  public static class MyInterpreterOutputListener implements InterpreterOutputListener {
     @Override
     public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-      LOGGER.info("onAppend:" + new String(line));
-      outputAppended.append(new String(line));
     }
 
     @Override
     public void onUpdate(int index, InterpreterResultMessageOutput out) {
-      try {
-        LOGGER.info("onUpdate:" + new String(out.toByteArray()));
-        outputUpdated.append(new String(out.toByteArray()));
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    public String getOutputAppended() {
-      return outputAppended.toString();
-    }
-
-    public String getOutputUpdated() {
-      return outputUpdated.toString();
-    }
 
-    public void reset() {
-      outputAppended = new StringBuilder();
-      outputUpdated = new StringBuilder();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/test/resources/livy_tutorial_1.scala
----------------------------------------------------------------------
diff --git a/livy/src/test/resources/livy_tutorial_1.scala b/livy/src/test/resources/livy_tutorial_1.scala
new file mode 100644
index 0000000..430dbbf
--- /dev/null
+++ b/livy/src/test/resources/livy_tutorial_1.scala
@@ -0,0 +1,24 @@
+import org.apache.commons.io.IOUtils
+import java.net.URL
+import java.nio.charset.Charset
+
+// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)
+// So you don't need create them manually
+
+// load bank data
+val bankText = sc.parallelize(
+  IOUtils.toString(
+    new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
+    Charset.forName("utf8")).split("\n"))
+
+case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)
+
+val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
+  s => Bank(s(0).toInt,
+    s(1).replaceAll("\"", ""),
+    s(2).replaceAll("\"", ""),
+    s(3).replaceAll("\"", ""),
+    s(5).replaceAll("\"", "").toInt
+  )
+).toDF()
+bank.registerTempTable("bank")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/livy/src/test/resources/livy_tutorial_2.sql
----------------------------------------------------------------------
diff --git a/livy/src/test/resources/livy_tutorial_2.sql b/livy/src/test/resources/livy_tutorial_2.sql
new file mode 100644
index 0000000..6e0f0f6
--- /dev/null
+++ b/livy/src/test/resources/livy_tutorial_2.sql
@@ -0,0 +1,5 @@
+select age, count(1) value
+from bank
+where age < 30
+group by age
+order by age
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/125a42ca/testing/setupLivy.sh
----------------------------------------------------------------------
diff --git a/testing/setupLivy.sh b/testing/setupLivy.sh
new file mode 100755
index 0000000..d57b74d
--- /dev/null
+++ b/testing/setupLivy.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+set -xe
+
+if [[ -n $LIVY_VER ]]; then
+    ./testing/downloadLivy.sh
+    export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
+    export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
+fi
+
+set +xe


Mime
View raw message