carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject carbondata git commit: [CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppelin support
Date Mon, 23 Jul 2018 02:11:02 GMT
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 9ac55a5a6 -> 7ad2fd951


[CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppelin support

This closes #2522


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7ad2fd95
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7ad2fd95
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7ad2fd95

Branch: refs/heads/carbonstore
Commit: 7ad2fd951126df0873e6610eb0a63935a0aa4789
Parents: 9ac55a5
Author: Ajith <ajith2489@gmail.com>
Authored: Wed Jul 18 16:48:54 2018 +0530
Committer: QiangCai <qiangcai@qq.com>
Committed: Mon Jul 23 10:09:55 2018 +0800

----------------------------------------------------------------------
 integration/zeppelin/README.md                  |  44 +++++
 integration/zeppelin/assembly/assembly.xml      |  37 ++++
 .../zeppelin/misc/interpreter-setting.json      |  22 +++
 integration/zeppelin/pom.xml                    |  99 ++++++++++
 .../carbondata/zeppelin/CarbonInterpreter.java  | 186 +++++++++++++++++++
 .../zeppelin/response/CarbonResponse.java       | 144 ++++++++++++++
 .../zeppelin/TestCarbonInterpreter.java         | 105 +++++++++++
 .../carbondata/zeppelin/TestCarbonResponse.java |  94 ++++++++++
 pom.xml                                         |   6 +
 store/sql/pom.xml                               |   4 +-
 .../rest/controller/SqlHorizonController.java   |  26 ++-
 11 files changed, 755 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/README.md
----------------------------------------------------------------------
diff --git a/integration/zeppelin/README.md b/integration/zeppelin/README.md
new file mode 100644
index 0000000..35b27bc
--- /dev/null
+++ b/integration/zeppelin/README.md
@@ -0,0 +1,44 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+### Please follow below steps to integrate with zeppelin  
+1. run ```mvn package -Pzeppelin```
+	This will generate _carbondata-zeppelin-*.tar.gz_ under target folder
+2. Extract the tar content to _ZEPPELIN_INSTALL_HOME/interpreter/_
+3. Add _org.apache.carbonndata.zeppelin.CarbonInterpreter_ to list of interpreters mentioned
by _zeppelin.interpreters_ @ _ZEPPELIN_INSTALL_HOME/conf/zeppelin-site.xml_ (create if not
exists)
+	Example:
+```xml
+	<property>
+	  <name>zeppelin.interpreters</name>
+<value>org.apache.zeppelin.spark.SparkInterpreter,.....,org.apache.carbonndata.zeppelin.CarbonInterpreter</value>
+	  <description>Comma separated interpreter configurations. First interpreter become
a default</description>
+	</property>
+```
+4. Add carbon to list of interpreters mentioned by zeppelin.interpreter.order @ ZEPPELIN_INSTALL_HOME/conf/zeppelin-site.xml
+	Example:
+```xml
+	<property>
+	  <name>zeppelin.interpreter.group.order</name>
+	  <value>spark,..,carbon</value>
+	  <description></description>
+	</property>
+```
+
+5. Restart Zeppelin server and add new interpreter with name _carbon_ from zeppelin interpreter
page
+    Refer : https://zeppelin.apache.org/docs/0.8.0/usage/interpreter/overview.html#what-is-zeppelin-interpreter
+6. Configure ```carbon.query.api.url``` in interpreter setting from zeppelin interpreter
page and click save
+7. Now can use notebook with interpreter ```%carbon```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/integration/zeppelin/assembly/assembly.xml b/integration/zeppelin/assembly/assembly.xml
new file mode 100644
index 0000000..6d09ad3
--- /dev/null
+++ b/integration/zeppelin/assembly/assembly.xml
@@ -0,0 +1,37 @@
+<assembly>
+<id>compress</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>/carbon</outputDirectory>
+	  <useTransitiveDependencies>false</useTransitiveDependencies>
+      <includes>
+          <include>com.fasterxml.jackson.core:*:jar</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+
+  <fileSets>
+  
+    <fileSet>
+      <directory>misc/</directory>
+	  <outputDirectory>carbon</outputDirectory>
+      <includes>
+        <include>*.json</include>
+      </includes>
+    </fileSet>
+
+    <fileSet>
+      <directory>target</directory>
+      <outputDirectory>carbon</outputDirectory>
+	  <includes>
+        <include>carbondata-zeppelin-*.jar</include>
+      </includes>
+    </fileSet>
+    
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/misc/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/integration/zeppelin/misc/interpreter-setting.json b/integration/zeppelin/misc/interpreter-setting.json
new file mode 100644
index 0000000..ce39804
--- /dev/null
+++ b/integration/zeppelin/misc/interpreter-setting.json
@@ -0,0 +1,22 @@
+[
+  {
+    "group": "carbon",
+    "name": "carbon",
+    "className": "org.apache.carbondata.zeppelin.CarbonInterpreter",
+    "properties": {
+      "carbon.query.api.url": {
+        "envName": null,
+        "propertyName": "carbon.query.api.url",
+        "defaultValue": "",
+        "description": "API URL for request",
+        "type": "string"
+      }
+    },
+    "editor": {
+      "language": "sql",
+      "editOnDblClick": false,
+      "completionKey": "TAB",
+      "completionSupport": true
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/pom.xml
----------------------------------------------------------------------
diff --git a/integration/zeppelin/pom.xml b/integration/zeppelin/pom.xml
new file mode 100644
index 0000000..c381c48
--- /dev/null
+++ b/integration/zeppelin/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-zeppelin</artifactId>
+  <name>Apache CarbonData :: Zeppelin</name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>0.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.8.11.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.8.10</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+	  
+	  <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptor>assembly/assembly.xml</descriptor>
+          <finalName>carbondata-zeppelin-${version}</finalName>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+     </plugin>
+	 
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java
----------------------------------------------------------------------
diff --git a/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java
b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java
new file mode 100644
index 0000000..f5dfa0e
--- /dev/null
+++ b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.zeppelin.response.CarbonResponse;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Carbon based interpreter for zeppelin
+ */
+public class CarbonInterpreter extends Interpreter {
+
+  public static final Logger logger = LoggerFactory.getLogger(CarbonInterpreter.class);
+
+  static final char TAB = '\t';
+
+  static final String LF = "\n";
+
+  /**
+   * Property which can be set in zeppelin to carbon REST API server
+   */
+  public static final String CARBON_QUERY_API_URL = "carbon.query.api.url";
+
+  /**
+   * These are the queries which need Table like output format
+   */
+  private static final String[] SEARCH_QUERIES = {"select", "list", "show", "desc"};
+
+  public CarbonInterpreter(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+  }
+
+  @Override
+  public void cancel(InterpreterContext interpreterContext) throws InterpreterException {
+  }
+
+  @Override
+  public int getProgress(InterpreterContext interpreterContext) throws InterpreterException
{
+    return 0;
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public InterpreterResult interpret(String sql, InterpreterContext interpreterContext)
+          throws InterpreterException {
+    try {
+      return executeQuery.apply(sql);
+    } catch (RuntimeException e) {
+      logger.error("failed to query data in carbon ", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+    }
+  }
+
+  /**
+   * This will execute the given sql Query by sending a post request on CARBON_QUERY_API_URL
+   */
+  private Function<String, HttpResponse> doPost = sql -> {
+    // prepare the post body
+    String postContent = new StringBuilder("{\"sqlStatement\":")
+            .append("\"").append(sql).append("\"" + "}").toString();
+    logger.debug("post:" + postContent);
+
+    // prepare entity and set content type
+    StringEntity entity = new StringEntity(postContent, "UTF-8");
+    entity.setContentType("application/json; charset=UTF-8");
+
+    // get the POST url from interpreter property
+    String postURL = getProperty(CARBON_QUERY_API_URL);
+    logger.debug("post url:" + postURL);
+
+    // do POST and get response
+    HttpPost postRequest = new HttpPost(postURL);
+    postRequest.setEntity(entity);
+    HttpClient httpClient = HttpClientBuilder.create().build();
+    try {
+      return httpClient.execute(postRequest);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  };
+
+  /**
+   * Check if output has to be sent as a able to zeppelin
+   */
+  private Function<String, Boolean> isTableFormatOutput = sql ->
+          (StringUtils.startsWithAny(sql, SEARCH_QUERIES));
+
+  /**
+   * returns InterpreterResult from CarbonResponse
+   */
+  private BiFunction<String, CarbonResponse, InterpreterResult> getResult = (sql, response)
-> {
+    if (isTableFormatOutput.apply(sql.toLowerCase().trim())) {
+      //format only select queries and return as table
+      String formattedResult = Arrays
+              .stream(response.getRows())
+              .filter(Objects::nonNull)
+              .map(row -> StringUtils.join(row, TAB))
+              .collect(Collectors.joining(LF));
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TABLE,
+              formattedResult);
+    } else {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TEXT,
+              response.getMessage());
+    }
+  };
+
+  /**
+   * Executes the given sql and return formatted result
+   */
+  private Function<String, InterpreterResult> executeQuery = sql -> {
+    try {
+      HttpResponse response = doPost.apply(sql);
+      // always close the content after reading fully to release connection
+      // IOUtils.toString will completely read the content
+      try (InputStream content = response.getEntity().getContent()) {
+        Optional<CarbonResponse> carbonResponse = CarbonResponse.parse(content);
+        int code = response.getStatusLine().getStatusCode();
+        if (code != 200) {
+          StringBuilder errorMessage = new StringBuilder("Failed : HTTP error code " + code
+ " .");
+          carbonResponse.ifPresent(rsp -> {
+            logger.error("Failed to execute query: " + rsp.getFullResponse());
+            errorMessage.append(rsp.getMessage());
+          });
+          return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage.toString());
+        } else {
+          return carbonResponse.map(rsp -> getResult.apply(sql, rsp))
+                  .orElseGet(() -> new InterpreterResult(InterpreterResult.Code.SUCCESS,
+                          InterpreterResult.Type.TEXT,
+                          "Query Success, but unable to parse response"));
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Error executing query ", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java
----------------------------------------------------------------------
diff --git a/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java
b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java
new file mode 100644
index 0000000..2428d75
--- /dev/null
+++ b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin.response;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.io.IOUtils;
+
+
+
+/**
+ * acts as a response object from carbon horizon server
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CarbonResponse {
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private String responseId;
+
+  private String message;
+
+  private Object[][] rows;
+
+  long timestamp;
+
+  int status;
+
+  String error;
+
+  String exception;
+
+  String path;
+
+  String fullResponse;
+
+
+  public String getResponseId() {
+    return responseId;
+  }
+
+  public void setResponseId(String responseId) {
+    this.responseId = responseId;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public Object[][] getRows() {
+    return rows;
+  }
+
+  public void setRows(Object[][] rows) {
+    this.rows = rows;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public void setStatus(int status) {
+    this.status = status;
+  }
+
+  public String getError() {
+    return error;
+  }
+
+  public void setError(String error) {
+    this.error = error;
+  }
+
+  public String getException() {
+    return exception;
+  }
+
+  public void setException(String exception) {
+    this.exception = exception;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public String getFullResponse() {
+    return fullResponse;
+  }
+
+  public void setFullResponse(String fullResponse) {
+    this.fullResponse = fullResponse;
+  }
+
+  /**
+   * Reads the input stream for JSON and return a CarbonResponse instance
+   * PS: Caller responsible for closing the stream
+   *
+   * @param inputStream
+   * @return
+   * @throws IOException
+   */
+  public static Optional<CarbonResponse> parse(InputStream inputStream) throws IOException
{
+    String plainTextResponse = IOUtils.toString(inputStream, "UTF-8");
+    CarbonResponse response = mapper.readValue(plainTextResponse, CarbonResponse.class);
+    response.setFullResponse(plainTextResponse);
+    return Optional.of(response);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java
----------------------------------------------------------------------
diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java
b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java
new file mode 100644
index 0000000..6cb47b4
--- /dev/null
+++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCarbonInterpreter {
+
+  static HttpServer server = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    server = HttpServer.create(new InetSocketAddress("localhost", 8123), 0);
+    server.createContext("/table/sql", new FakePostHandler());
+    server.start();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    Optional.of(server).ifPresent(serverSocket -> serverSocket.stop(0));
+  }
+
+  @Test(timeout = 5000)
+  public void testInterpreterSelectSuccessResponse() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.put(CarbonInterpreter.CARBON_QUERY_API_URL, "http://localhost:8123/table/sql");
+    CarbonInterpreter interpreter = new CarbonInterpreter(properties);
+    InterpreterResult result = interpreter.interpret("show tables", null);
+    String expectedFormattedResult = "{\"code\":\"SUCCESS\",\"msg\":[{\"type\":\"TABLE\",\"data\""
+
+            ":\"database\\ttableName\\tisTemporary\\ndefault\\tsinka6\\tfalse\\ndefault\\tsinka7\\tfalse\"}]}";
+    assertEquals(expectedFormattedResult, result.toJson());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+  }
+}
+
+class FakePostHandler implements HttpHandler {
+
+  @Override
+  public void handle(HttpExchange he) throws IOException {
+    InputStreamReader isr = new InputStreamReader(he.getRequestBody(), "utf-8");
+    BufferedReader br = new BufferedReader(isr);
+    String query = br.readLine();
+    String response = "";
+    if (query.equals("{\"sqlStatement\":\"show tables\"}")) {
+      response = "{\n" +
+              "    \"responseId\": 19435528129427470,\n" +
+              "    \"message\": \"SUCCESS\",\n" +
+              "    \"rows\": [\n" +
+              "        [\n" +
+              "            \"database\",\n" +
+              "            \"tableName\",\n" +
+              "            \"isTemporary\"\n" +
+              "        ],\n" +
+              "        [\n" +
+              "            \"default\",\n" +
+              "            \"sinka6\",\n" +
+              "            false\n" +
+              "        ],\n" +
+              "        [\n" +
+              "            \"default\",\n" +
+              "            \"sinka7\",\n" +
+              "            false\n" +
+              "        ]\n" +
+              "    ]\n" +
+              "}";
+    }
+    he.sendResponseHeaders(200, response.length());
+    OutputStream os = he.getResponseBody();
+    os.write(response.toString().getBytes());
+    os.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
----------------------------------------------------------------------
diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
new file mode 100644
index 0000000..4b169f4
--- /dev/null
+++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.carbondata.zeppelin.response.CarbonResponse;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestCarbonResponse {
+
+  @Test(expected = JsonMappingException.class)
+  public void testBodyIsEmpty() throws IOException {
+    String input = "";
+    CarbonResponse.parse(new ByteArrayInputStream(input.getBytes()));
+  }
+
+  @Test
+  public void testSuccessResponse() throws IOException {
+    String input = "{\n" +
+            "    \"responseId\": 19435528129427470,\n" +
+            "    \"message\": \"SUCCESS\",\n" +
+            "    \"rows\": [\n" +
+            "        [\n" +
+            "            \"database\",\n" +
+            "            \"tableName\",\n" +
+            "            \"isTemporary\"\n" +
+            "        ],\n" +
+            "        [\n" +
+            "            \"default\",\n" +
+            "            \"sinka6\",\n" +
+            "            false\n" +
+            "        ],\n" +
+            "        [\n" +
+            "            \"default\",\n" +
+            "            \"sinka7\",\n" +
+            "            false\n" +
+            "        ]\n" +
+            "    ]\n" +
+            "}";
+    Object[][] expectedResponse = new Object[3][];
+    expectedResponse[0] = new Object[]{"database", "tableName", "isTemporary"};
+    expectedResponse[1] = new Object[]{"default", "sinka6", false};
+    expectedResponse[2] = new Object[]{"default", "sinka7", false};
+    CarbonResponse successResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get();
+    assertEquals("SUCCESS", successResponse.getMessage());
+    assertEquals("19435528129427470", successResponse.getResponseId());
+    assertTrue(Arrays.deepEquals(expectedResponse, successResponse.getRows()));
+    assertEquals(input, successResponse.getFullResponse());
+  }
+
+  @Test
+  public void testErrorResponse() throws IOException {
+    String input = "{\n" +
+            "    \"timestamp\": 1531884083849,\n" +
+            "    \"status\": 500,\n" +
+            "    \"error\": \"Internal Server Error\",\n" +
+            "    \"exception\": \"org.apache.carbondata.store.api.exception.StoreException\",\n"
+
+            "    \"message\": \"org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException:
" +
+            "Table or view 'sinka6' already exists in database 'default';\",\n" +
+            "    \"path\": \"/table/sql\"\n" +
+            "}";
+    CarbonResponse errorResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get();
+    assertEquals("org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
+            "Table or view 'sinka6' already exists in database 'default';", errorResponse.getMessage());
+    assertEquals("org.apache.carbondata.store.api.exception.StoreException", errorResponse.getException());
+    assertEquals(1531884083849L, errorResponse.getTimestamp());
+    assertEquals("Internal Server Error", errorResponse.getError());
+    assertEquals(500, errorResponse.getStatus());
+    assertEquals("/table/sql", errorResponse.getPath());
+    assertEquals(input, errorResponse.getFullResponse());
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f1f51f0..2135b9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -646,6 +646,12 @@
         <module>datamap/mv/core</module>
       </modules>
     </profile>
+    <profile>
+      <id>zeppelin</id>
+      <modules>
+        <module>integration/zeppelin</module>
+      </modules>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/store/sql/pom.xml
----------------------------------------------------------------------
diff --git a/store/sql/pom.xml b/store/sql/pom.xml
index 411590b..d90ebb3 100644
--- a/store/sql/pom.xml
+++ b/store/sql/pom.xml
@@ -48,8 +48,8 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
----------------------------------------------------------------------
diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
index 7583a14..da9df52 100644
--- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
+++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.horizon.rest.controller;
 
 import java.util.List;
+import java.util.stream.IntStream;
 
 import org.apache.carbondata.horizon.rest.model.validate.RequestValidator;
 import org.apache.carbondata.horizon.rest.model.view.SqlRequest;
@@ -26,6 +27,7 @@ import org.apache.carbondata.horizon.rest.sql.SparkSqlWrapper;
 import org.apache.carbondata.store.api.exception.StoreException;
 
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -42,25 +44,29 @@ public class SqlHorizonController {
   public ResponseEntity<SqlResponse> sql(@RequestBody SqlRequest request) throws StoreException
{
     RequestValidator.validateSql(request);
     List<Row> rows;
+    Dataset<Row> sqlDataFrame = null;
     try {
-      rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement())
-          .collectAsList();
+      sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(),
+              request.getSqlStatement());
+      rows = sqlDataFrame.collectAsList();
     } catch (AnalysisException e) {
       throw new StoreException(e.getSimpleMessage());
     } catch (Exception e) {
       throw new StoreException(e.getMessage());
     }
-    Object[][] result = new Object[rows.size()][];
-    for (int i = 0; i < rows.size(); i++) {
-      Row row = rows.get(i);
-      result[i] = new Object[row.size()];
-      for (int j = 0; j < row.size(); j++) {
-        result[i][j] = row.get(j);
-      }
+    final String[] fieldNames = sqlDataFrame.schema().fieldNames();
+    Object[][] responseData = new Object[0][];
+    if (rows.size() > 0) {
+      final Object[][] result = new Object[rows.size() + 1][fieldNames.length];
+      System.arraycopy(fieldNames, 0, result[0], 0, fieldNames.length);
+      IntStream.range(0, rows.size()).forEach(index ->
+          IntStream.range(0, fieldNames.length).forEach(col ->
+                result[index + 1][col] = rows.get(index).get(col)));
+      responseData = result;
     }
 
     return new ResponseEntity<>(
-        new SqlResponse(request, "SUCCESS", result), HttpStatus.OK);
+        new SqlResponse(request, "SUCCESS", responseData), HttpStatus.OK);
   }
 
   @RequestMapping(value = "echosql")


Mime
View raw message