Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 18226200BFC for ; Sat, 14 Jan 2017 17:40:24 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id ED324160B35; Sat, 14 Jan 2017 16:40:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E6155160B2D for ; Sat, 14 Jan 2017 17:40:22 +0100 (CET) Received: (qmail 66318 invoked by uid 500); 14 Jan 2017 16:40:21 -0000 Mailing-List: contact commits-help@zeppelin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.apache.org Delivered-To: mailing list commits@zeppelin.apache.org Received: (qmail 66309 invoked by uid 99); 14 Jan 2017 16:40:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Jan 2017 16:40:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 30244DFC9D; Sat, 14 Jan 2017 16:40:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jongyoul@apache.org To: commits@zeppelin.apache.org Message-Id: <7d6e24cfee984bb5955bdc0e15281803@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zeppelin git commit: ZEPPELIN-1293. Livy Interpreter: Automatically attach or create to a new session Date: Sat, 14 Jan 2017 16:40:21 +0000 (UTC) archived-at: Sat, 14 Jan 2017 16:40:24 -0000 Repository: zeppelin Updated Branches: refs/heads/master 69bc353d3 -> 00742ffdb ZEPPELIN-1293. Livy Interpreter: Automatically attach or create to a new session ### What is this PR for? By default, livy session will expire after one hour. This PR would create session automatically for user if session is expired, and would also display the session expire information in frontend. The expire message would only display at the first time of session recreation, after that the message won't be displayed. ### What type of PR is it? [Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1293 ### How should this be tested? Tested manually. ![image](https://cloud.githubusercontent.com/assets/164491/21761175/2473c0c0-d68c-11e6-8f39-9e87333c6168.png) ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang Closes #1861 from zjffdu/ZEPPELIN-1293 and squashes the following commits: e174593 [Jeff Zhang] minor update on warning message 30c3569 [Jeff Zhang] address comments 88f0d9a [Jeff Zhang] ZEPPELIN-1293. Livy Interpreter: Automatically attach or create to a new session Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/00742ffd Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/00742ffd Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/00742ffd Branch: refs/heads/master Commit: 00742ffdb4cc349ef3ee99dced9e08e2b5a404f6 Parents: 69bc353 Author: Jeff Zhang Authored: Fri Jan 13 11:30:13 2017 +0800 Committer: Jongyoul Lee Committed: Sun Jan 15 01:40:12 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/livy/BaseLivyInterprereter.java | 74 ++++++++++++++++---- .../zeppelin/livy/LivySparkSQLInterpreter.java | 65 +++++++++-------- 2 files changed, 97 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/00742ffd/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 0c8c8e2..8ed4622 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -21,10 +21,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.apache.zeppelin.interpreter.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpEntity; @@ -39,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; /** * Base class for livy interpreters. @@ -48,10 +46,11 @@ public abstract class BaseLivyInterprereter extends Interpreter { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class); private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create(); - protected SessionInfo sessionInfo; + protected volatile SessionInfo sessionInfo; private String livyURL; private long sessionCreationTimeout; protected boolean displayAppInfo; + private AtomicBoolean sessionExpired = new AtomicBoolean(false); public BaseLivyInterprereter(Properties property) { super(property); @@ -90,16 +89,17 @@ public abstract class BaseLivyInterprereter extends Interpreter { // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it // explicitly by ourselves. sessionInfo.appId = extractStatementResult( - interpret("sc.applicationId", false).message() + interpret("sc.applicationId", false, false).message() .get(0).getData()); } interpret( - "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false); + "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", + false, false); if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) { sessionInfo.webUIAddress = extractStatementResult( interpret( - "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false) + "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false) .message().get(0).getData()); } else { sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl"); @@ -120,7 +120,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { } try { - return interpret(st, this.displayAppInfo); + return interpret(st, this.displayAppInfo, true); } catch (LivyException e) { LOGGER.error("Fail to interpret:" + st, e); return new InterpreterResult(InterpreterResult.Code.ERROR, @@ -206,9 +206,26 @@ public abstract class BaseLivyInterprereter extends Interpreter { return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET")); } - public InterpreterResult interpret(String code, boolean displayAppInfo) + public InterpreterResult interpret(String code, boolean displayAppInfo, + boolean appendSessionExpired) throws LivyException { - StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code)); + StatementInfo stmtInfo = null; + boolean sessionExpired = false; + try { + stmtInfo = executeStatement(new ExecuteRequest(code)); + } catch (SessionNotFoundException e) { + LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id); + sessionExpired = true; + // we don't want to create multiple sessions because it is possible to have multiple thread + // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need + // to check session status again in this sync block + synchronized (this) { + if (isSessionExpired()) { + initLivySession(); + } + } + stmtInfo = executeStatement(new ExecuteRequest(code)); + } // pull the statement status while (!stmtInfo.isAvailable()) { try { @@ -219,7 +236,38 @@ public abstract class BaseLivyInterprereter extends Interpreter { } stmtInfo = getStatementInfo(stmtInfo.id); } - return getResultFromStatementInfo(stmtInfo, displayAppInfo); + if (appendSessionExpired) { + return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo), + sessionExpired); + } else { + return getResultFromStatementInfo(stmtInfo, displayAppInfo); + } + } + + private boolean isSessionExpired() throws LivyException { + try { + getSessionInfo(sessionInfo.id); + return false; + } catch (SessionNotFoundException e) { + return true; + } catch (LivyException e) { + throw e; + } + } + + private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) { + if (sessionExpired) { + InterpreterResult result2 = new InterpreterResult(result.code()); + result2.add(InterpreterResult.Type.HTML, + "Previous livy session is expired, new livy session is created. " + + "Paragraphs that depend on this paragraph need to be re-executed!" + ""); + for (InterpreterResultMessage message : result.message()) { + result2.add(message.getType(), message.getData()); + } + return result2; + } else { + return result; + } } private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo, @@ -340,7 +388,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { || response.getStatusCode().value() == 201 || response.getStatusCode().value() == 404) { String responseBody = response.getBody(); - if (responseBody.matches("Session '\\d+' not found.")) { + if (responseBody.matches("\"Session '\\d+' not found.\"")) { throw new SessionNotFoundException(responseBody); } else { return responseBody; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/00742ffd/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index cdc8b5b..0e78860 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -51,7 +51,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession // to judge whether it is using spark2. try { - InterpreterResult result = sparkInterpreter.interpret("spark", false); + InterpreterResult result = sparkInterpreter.interpret("spark", false, false); if (result.code() == InterpreterResult.Code.SUCCESS && result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) { LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}", @@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { isSpark2 = true; } else { // spark 1.x - result = sparkInterpreter.interpret("sqlContext", false); + result = sparkInterpreter.interpret("sqlContext", false, false); if (result.code() == InterpreterResult.Code.SUCCESS) { LOGGER.info("sqlContext is detected."); } else if (result.code() == InterpreterResult.Code.ERROR) { @@ -68,7 +68,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves"); result = sparkInterpreter.interpret( "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" - + "import sqlContext.implicits._", false); + + "import sqlContext.implicits._", false, false); if (result.code() == InterpreterResult.Code.ERROR) { throw new LivyException("Fail to create SQLContext," + result.message().get(0).getData()); @@ -113,37 +113,44 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { } else { sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; } - InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo); - - if (res.code() == InterpreterResult.Code.SUCCESS) { - StringBuilder resMsg = new StringBuilder(); - resMsg.append("%table "); - String[] rows = res.message().get(0).getData().split("\n"); - String[] headers = rows[1].split("\\|"); - for (int head = 1; head < headers.length; head++) { - resMsg.append(headers[head].trim()).append("\t"); - } - resMsg.append("\n"); - if (rows[3].indexOf("+") == 0) { - - } else { - for (int cols = 3; cols < rows.length - 1; cols++) { - String[] col = rows[cols].split("\\|"); - for (int data = 1; data < col.length; data++) { - resMsg.append(col[data].trim()).append("\t"); + InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true); + + if (result.code() == InterpreterResult.Code.SUCCESS) { + InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS); + for (InterpreterResultMessage message : result.message()) { + // convert Text type to Table type. We assume the text type must be the sql output. This + // assumption is correct for now. Ideally livy should return table type. We may do it in + // the future release of livy. + if (message.getType() == InterpreterResult.Type.TEXT) { + StringBuilder resMsg = new StringBuilder(); + String[] rows = message.getData().split("\n"); + String[] headers = rows[1].split("\\|"); + for (int head = 1; head < headers.length; head++) { + resMsg.append(headers[head].trim()).append("\t"); } resMsg.append("\n"); + if (rows[3].indexOf("+") == 0) { + + } else { + for (int cols = 3; cols < rows.length - 1; cols++) { + String[] col = rows[cols].split("\\|"); + for (int data = 1; data < col.length; data++) { + resMsg.append(col[data].trim()).append("\t"); + } + resMsg.append("\n"); + } + } + if (rows[rows.length - 1].indexOf("only") == 0) { + resMsg.append("" + rows[rows.length - 1] + "."); + } + result2.add(InterpreterResult.Type.TABLE, resMsg.toString()); + } else { + result2.add(message.getType(), message.getData()); } } - if (rows[rows.length - 1].indexOf("only") == 0) { - resMsg.append("" + rows[rows.length - 1] + "."); - } - - return new InterpreterResult(InterpreterResult.Code.SUCCESS, - resMsg.toString() - ); + return result2; } else { - return res; + return result; } } catch (Exception e) { LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);