carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/4] carbondata git commit: [CARBONDATA-2690][CarbonStore] implement RESTful API: create table, load data and select
Date Fri, 06 Jul 2018 07:35:18 GMT
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore fa111380f -> 6fa86381f


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
new file mode 100644
index 0000000..a3e9f1c
--- /dev/null
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.model.view;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.horizon.rest.model.descriptor.LoadDescriptor;
+
+public class LoadRequest {
+
+  private String databaseName;
+  private String tableName;
+  private String inputPath;
+  private Map<String, String> options;
+  private boolean isOverwrite;
+
+  public LoadRequest() {
+  }
+
+  public LoadRequest(String databaseName, String tableName, String inputPaths,
+      Map<String, String> options, boolean isOverwrite) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.inputPath = inputPaths;
+    this.options = options;
+    this.isOverwrite = isOverwrite;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getInputPath() {
+    return inputPath;
+  }
+
+  public void setInputPath(String inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+
+  public boolean isOverwrite() {
+    return isOverwrite;
+  }
+
+  public void setOverwrite(boolean overwrite) {
+    isOverwrite = overwrite;
+  }
+
+  public LoadDescriptor convertToDto() {
+    return new LoadDescriptor(databaseName, tableName, inputPath, options, isOverwrite);
+  }
+
+  public static class Builder {
+    private LoadRequest load;
+    private Map<String, String> options;
+
+    private Builder() {
+      load = new LoadRequest();
+      options = new HashMap<>();
+    }
+
+    public Builder databaseName(String databaseName) {
+      load.setDatabaseName(databaseName);
+      return this;
+    }
+
+    public Builder tableName(String tableName) {
+      load.setTableName(tableName);
+      return this;
+    }
+
+    public Builder overwrite(boolean isOverwrite) {
+      load.setOverwrite(isOverwrite);
+      return this;
+    }
+
+    public Builder inputPath(String inputPath) {
+      load.setInputPath(inputPath);
+      return this;
+    }
+
+    public Builder options(String key, String value) {
+      options.put(key, value);
+      return this;
+    }
+
+    public LoadRequest create() {
+      load.setOptions(options);
+      return load;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java
new file mode 100644
index 0000000..3d5b3df
--- /dev/null
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectRequest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.model.view;
+
+import org.apache.carbondata.horizon.rest.model.descriptor.SelectDescriptor;
+
+public class SelectRequest {
+
+  private String databaseName;
+  private String tableName;
+  private String[] select;
+  private String filter;
+  private int limit;
+
+  public SelectRequest() {
+
+  }
+
+  public SelectRequest(String databaseName, String tableName, String[] select, String filter,
+      int limit) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.select = select;
+    this.filter = filter;
+    this.limit = limit;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String[] getSelect() {
+    return select;
+  }
+
+  public void setSelect(String[] select) {
+    this.select = select;
+  }
+
+  public String getFilter() {
+    return filter;
+  }
+
+  public void setFilter(String filter) {
+    this.filter = filter;
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public SelectDescriptor convertToDto() {
+    return new SelectDescriptor(
+        databaseName, tableName, select, filter, limit);
+  }
+
+  public static class Builder {
+
+    private SelectRequest select;
+
+    private Builder() {
+      select = new SelectRequest();
+    }
+
+    public Builder databaseName(String databaseName) {
+      select.setDatabaseName(databaseName);
+      return this;
+    }
+
+    public Builder tableName(String tableName) {
+      select.setTableName(tableName);
+      return this;
+    }
+
+    public Builder select(String... columnNames) {
+      select.setSelect(columnNames);
+      return this;
+    }
+
+    public Builder filter(String fitler) {
+      select.setFilter(fitler);
+      return this;
+    }
+
+    public Builder limit(int limit) {
+      select.setLimit(limit);
+      return this;
+    }
+
+    public SelectRequest create() {
+      return select;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
new file mode 100644
index 0000000..edf584a
--- /dev/null
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.model.view;
+
+public class SelectResponse {
+
+  private String selectId;
+  private Object[][] rows;
+
+  public SelectResponse() {
+
+  }
+
+  public SelectResponse(String selectId, Object[][] rows) {
+    this.selectId = selectId;
+    this.rows = rows;
+  }
+
+  public String getSelectId() {
+    return selectId;
+  }
+
+  public void setSelectId(String selectId) {
+    this.selectId = selectId;
+  }
+
+  public Object[][] getRows() {
+    return rows;
+  }
+
+  public void setRows(Object[][] rows) {
+    this.rows = rows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java
new file mode 100644
index 0000000..7495ca3
--- /dev/null
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/service/HorizonService.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.service;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.horizon.antlr.ANTLRNoCaseStringStream;
+import org.apache.carbondata.horizon.antlr.FilterVisitor;
+import org.apache.carbondata.horizon.antlr.gen.ExpressionLexer;
+import org.apache.carbondata.horizon.antlr.gen.ExpressionParser;
+import org.apache.carbondata.horizon.rest.model.descriptor.LoadDescriptor;
+import org.apache.carbondata.horizon.rest.model.descriptor.SelectDescriptor;
+import org.apache.carbondata.horizon.rest.model.descriptor.TableDescriptor;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.store.exception.StoreException;
+import org.apache.carbondata.store.master.Master;
+
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+
+public class HorizonService {
+
+  private static LogService LOGGER =
+      LogServiceFactory.getLogService(HorizonService.class.getName());
+
+  private static HorizonService instance;
+
+  private Master master;
+
+  private HorizonService() {
+    master = Master.getInstance(null);
+  }
+
+  public boolean createTable(TableDescriptor tableDescriptor) throws StoreException {
+    TableSchemaBuilder builder = TableSchema.builder();
+    builder.tableName(tableDescriptor.getName()).properties(tableDescriptor.getProperties());
+
+    Field[] fields = tableDescriptor.getSchema().getFields();
+    // sort_columns
+    List<String> sortColumnsList = null;
+    try {
+      sortColumnsList =
+          tableDescriptor.getSchema().prepareSortColumns(tableDescriptor.getProperties());
+    } catch (MalformedCarbonCommandException e) {
+      throw new StoreException(e.getMessage());
+    }
+    ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
+    // tableDescriptor schema
+    CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList);
+    builder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
+
+    TableSchema schema = builder.build();
+    SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+    schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+    schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
+    schema.setTableName(tableDescriptor.getName());
+
+    TableInfo tableInfo = CarbonTable
+        .builder()
+        .databaseName(tableDescriptor.getDatabase())
+        .tableName(tableDescriptor.getName())
+        .tablePath(
+            master.getTableFolder(tableDescriptor.getDatabase(), tableDescriptor.getName()))
+        .tableSchema(schema)
+        .isTransactionalTable(true)
+        .buildTableInfo();
+
+    try {
+      return master.createTable(tableInfo, tableDescriptor.isIfNotExists());
+    } catch (IOException e) {
+      LOGGER.error(e, "create tableDescriptor failed");
+      throw new StoreException(e.getMessage());
+    }
+  }
+
+  public boolean loadData(LoadDescriptor loadDescriptor) throws StoreException {
+    CarbonTable table = master.getTable(
+        loadDescriptor.getDatabaseName(), loadDescriptor.getTableName());
+    CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table);
+    modelBuilder.setInputPath(loadDescriptor.getInputPath());
+    try {
+      CarbonLoadModel model =
+          modelBuilder.build(loadDescriptor.getOptions(), System.currentTimeMillis(), "0");
+
+      return master.loadData(model, loadDescriptor.isOverwrite());
+    } catch (InvalidLoadOptionException e) {
+      LOGGER.error(e, "Invalid loadDescriptor options");
+      throw new StoreException(e.getMessage());
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to loadDescriptor data");
+      throw new StoreException(e.getMessage());
+    }
+  }
+
+  public CarbonRow[] select(SelectDescriptor selectDescriptor) throws StoreException {
+    try {
+      CarbonTable carbonTable = master.getTable(
+          selectDescriptor.getDatabaseName(), selectDescriptor.getTableName());
+      return master.search(
+          carbonTable,
+          selectDescriptor.getProjection(),
+          parseFilter(selectDescriptor.getFilter(), carbonTable),
+          selectDescriptor.getLimit(),
+          selectDescriptor.getLimit());
+    } catch (IOException e) {
+      LOGGER.error(e, "[" + selectDescriptor.getId() + "] select failed");
+      throw new StoreException(e.getMessage());
+    }
+  }
+
+  public static Expression parseFilter(String filter, CarbonTable carbonTable) {
+    if (filter == null) {
+      return null;
+    }
+    CharStream input = new ANTLRNoCaseStringStream(filter);
+    ExpressionLexer lexer = new ExpressionLexer(input);
+    CommonTokenStream tokens = new CommonTokenStream(lexer);
+    ExpressionParser parser = new ExpressionParser(tokens);
+    ExpressionParser.ParseFilterContext tree = parser.parseFilter();
+    FilterVisitor visitor = new FilterVisitor(carbonTable);
+    return visitor.visitParseFilter(tree);
+  }
+
+  public static synchronized HorizonService getInstance() {
+    if (instance == null) {
+      instance = new HorizonService();
+    }
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java
b/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java
new file mode 100644
index 0000000..72ea0a7
--- /dev/null
+++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/FilterParseTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.horizon.rest.model.descriptor.TableDescriptor;
+import org.apache.carbondata.horizon.rest.service.HorizonService;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FilterParseTest {
+
+  private static CarbonTable carbonTable;
+
+  @BeforeClass
+  public static void setup() throws MalformedCarbonCommandException {
+
+    CreateTableRequest createTableRequest = CreateTableRequest
+        .builder()
+        .ifNotExists()
+        .databaseName("default")
+        .tableName("table_1")
+        .comment("first tableDescriptor")
+        .column("shortField", "SHORT", "short field")
+        .column("intField", "INT", "int field")
+        .column("bigintField", "LONG", "long field")
+        .column("doubleField", "DOUBLE", "double field")
+        .column("stringField", "STRING", "string field")
+        .column("timestampField", "TIMESTAMP", "timestamp field")
+        .column("decimalField", "DECIMAL", 18, 2, "decimal field")
+        .column("dateField", "DATE", "date field")
+        .column("charField", "CHAR", "char field")
+        .column("floatField", "FLOAT", "float field")
+        .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
+        .create();
+
+    TableDescriptor tableDescriptor = createTableRequest.convertToDto();
+
+    TableSchemaBuilder builder = TableSchema.builder();
+    builder.tableName(tableDescriptor.getName()).properties(tableDescriptor.getProperties());
+
+    Field[] fields = tableDescriptor.getSchema().getFields();
+    // sort_columns
+    List<String> sortColumnsList =
+        tableDescriptor.getSchema().prepareSortColumns(tableDescriptor.getProperties());
+    ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
+    // tableDescriptor schema
+    CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList);
+    builder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
+
+    TableSchema schema = builder.build();
+    SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+    schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+    schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
+    schema.setTableName(tableDescriptor.getName());
+
+    carbonTable = CarbonTable
+        .builder()
+        .databaseName(tableDescriptor.getDatabase())
+        .tableName(tableDescriptor.getName())
+        .tablePath("")
+        .tableSchema(schema)
+        .isTransactionalTable(true)
+        .build();
+  }
+
+
+  private void checkExpression(String sql1, String sql2) {
+    Expression expression = HorizonService.parseFilter(sql1, carbonTable);
+    Assert.assertEquals(sql2, expression.getStatement());
+  }
+
+  private void checkExpression(String sql) {
+    checkExpression(sql, sql);
+  }
+
+  @Test
+  public void testFilterParse() {
+
+    // >, >=, <, <=, =, <>, !=
+    checkExpression("intField > 10");
+    checkExpression("intField >= 10");
+    checkExpression("intField < 10");
+    checkExpression("intField <= 10");
+    checkExpression("intField = 10");
+    checkExpression("stringField = 'carbon'");
+    checkExpression("intField <> 10");
+    checkExpression("stringField <> 'carbon'");
+    checkExpression("intField != 10", "intField <> 10");
+    checkExpression("stringField != 'carbon'", "stringField <> 'carbon'");
+
+    // is null, is not null
+    checkExpression("stringField is null");
+    checkExpression("stringField is not null");
+
+    // in, not in
+    checkExpression("intField in (10, 20, 30)" );
+    checkExpression("stringField in ('spark', 'carbon')" );
+    checkExpression("intField not in (10, 20, 30)" );
+    checkExpression("stringField not in ('spark', 'carbon')" );
+
+    // between and, not between and
+    checkExpression("intField between 10 and 30", "intField >= 10 and intField <= 30"
);
+    checkExpression("intField not between 10 and 30", "intField > 30 and intField <
10" );
+
+    // and, or
+    checkExpression(
+        "intField > 10 and stringField = 'carbon'",
+        "(intField > 10 and stringField = 'carbon')");
+    checkExpression(
+        "(intField > 10) and stringField = 'carbon'",
+        "(intField > 10 and stringField = 'carbon')");
+    checkExpression(
+        "(intField > 10) or stringField = 'carbon'",
+        "(intField > 10 or stringField = 'carbon')");
+    checkExpression(
+        "intField > -10 or (stringField = 'carbon' and floatField > 5.0)",
+        "(intField > -10 or (stringField = 'carbon' and floatField > 5.0))");
+
+    // data type: short, int, bigint, double, decimal, bigDecimal string, timestamp, date
+    checkExpression("shortField = 1+S", "shortField = 1");
+    checkExpression("intField = 1");
+    checkExpression("bigintField = 1+L", "bigintField = 1");
+    checkExpression("doubleField = 1.01+D", "doubleField = 1.01");
+    checkExpression("decimalField = 1000.01001", "decimalField = 1000.01001");
+    checkExpression("decimalField = 1000.01001+BD", "decimalField = 1000.01001");
+    checkExpression("stringField = 'carbon'");
+    checkExpression("timestampField = '2018-01-01 10:01:01'");
+    checkExpression("dateField = '2018-01-01'");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
new file mode 100644
index 0000000..e57e813
--- /dev/null
+++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.store.conf.StoreConf;
+import org.apache.carbondata.horizon.rest.controller.Horizon;
+import org.apache.carbondata.store.master.Master;
+import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
+import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
+import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
+import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
+import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.store.worker.Worker;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.springframework.web.client.RestTemplate;
+
+public class HorizonTest {
+
+  private static Master master;
+  private static Worker worker;
+  private static String serviceUri = "http://localhost:8080";
+  private static String projectFolder;
+
+  private static RestTemplate restTemplate;
+
+  @BeforeClass
+  public static void setup() throws IOException, InterruptedException {
+    projectFolder = new File(HorizonTest.class.getResource("/").getPath() +
+        "../../../../").getCanonicalPath();
+    String log4jFile = projectFolder + "/store/conf/log4j.properties";
+    String confFile = projectFolder + "/store/conf/store.conf";
+
+    System.setProperty("log.path", projectFolder + "/store/core/target/master_worker.log");
+    StoreUtil.initLog4j(log4jFile);
+
+    StoreConf storeConf = new StoreConf(confFile);
+    storeConf.conf(
+        StoreConf.STORE_LOCATION,
+        storeConf.storeLocation() + System.currentTimeMillis());
+
+    // start master
+    master = Master.getInstance(storeConf);
+    master.startService();
+
+    new Thread() {
+      public void run() {
+        Horizon.main(new String[0]);
+      }
+    }.start();
+    Thread.sleep(10000);
+
+    // start worker
+    worker = new Worker(storeConf);
+    worker.start();
+
+    restTemplate = new RestTemplate();
+  }
+
+  @Test
+  public void testHorizon() {
+    // create table if not exists
+    CreateTableRequest table = CreateTableRequest
+        .builder()
+        .ifNotExists()
+        .databaseName("default")
+        .tableName("table_1")
+        .comment("first table")
+        .column("shortField", "SHORT", "short field")
+        .column("intField", "INT", "int field")
+        .column("bigintField", "LONG", "long field")
+        .column("doubleField", "DOUBLE", "double field")
+        .column("stringField", "STRING", "string field")
+        .column("timestampField", "TIMESTAMP", "timestamp field")
+        .column("decimalField", "DECIMAL", 18, 2, "decimal field")
+        .column("dateField", "DATE", "date field")
+        .column("charField", "CHAR", "char field")
+        .column("floatField", "FLOAT", "float field")
+        .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
+        .create();
+    String createTable =
+        restTemplate.postForObject(serviceUri + "/table/create", table, String.class);
+    Assert.assertEquals(true, Boolean.valueOf(createTable));
+
+    // load one segment
+    LoadRequest load = LoadRequest
+        .builder()
+        .databaseName("default")
+        .tableName("table_1")
+        .overwrite(false)
+        .inputPath(projectFolder + "/store/horizon/src/test/resources/data1.csv")
+        .options("header", "true")
+        .create();
+    String loadData =
+        restTemplate.postForObject(serviceUri + "/table/load", load, String.class);
+    Assert.assertEquals(true, Boolean.valueOf(loadData));
+
+    // select row
+    SelectRequest select = SelectRequest
+        .builder()
+        .databaseName("default")
+        .tableName("table_1")
+        .select("intField", "stringField")
+        .limit(5)
+        .create();
+    SelectResponse result =
+        restTemplate.postForObject(serviceUri + "/table/select", select, SelectResponse.class);
+    Assert.assertEquals(5, result.getRows().length);
+
+    // select row with filter
+    SelectRequest filter = SelectRequest
+        .builder()
+        .databaseName("default")
+        .tableName("table_1")
+        .select("intField", "stringField")
+        .filter("intField = 11")
+        .limit(5)
+        .create();
+    SelectResponse fitlerResult =
+        restTemplate.postForObject(serviceUri + "/table/select", filter, SelectResponse.class);
+    Assert.assertEquals(1, fitlerResult.getRows().length);
+  }
+
+  @AfterClass
+  public static void release() throws InterruptedException {
+    worker.stop();
+    Horizon.close();
+    master.stopService();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/horizon/src/test/resources/data1.csv
----------------------------------------------------------------------
diff --git a/store/horizon/src/test/resources/data1.csv b/store/horizon/src/test/resources/data1.csv
new file mode 100644
index 0000000..cf732eb
--- /dev/null
+++ b/store/horizon/src/test/resources/data1.csv
@@ -0,0 +1,11 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField
+1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5
+5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5
+1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5
+1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5
+1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5
+3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5
+2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5
+1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5
+4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5
+1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index 627e060..c3967f4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -40,13 +40,13 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  * Implementation to write rows in CSV format to carbondata file.
  */
 @InterfaceAudience.Internal
-class CSVCarbonWriter extends CarbonWriter {
+public class CSVCarbonWriter extends CarbonWriter {
 
   private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
   private TaskAttemptContext context;
   private ObjectArrayWritable writable;
 
-  CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
+  public CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
     Configuration hadoopConf = new Configuration();
     CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
     CarbonTableOutputFormat format = new CarbonTableOutputFormat();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 0f670fe..a4ca510 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -421,7 +421,7 @@ public class CarbonWriterBuilder {
     return table;
   }
 
-  private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
+  public static void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
       List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) {
     Set<String> uniqueFields = new HashSet<>();
     // a counter which will be used in case of complex array type. This valIndex will be
assigned

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6fa86381/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
index 6131d45..c9622e1 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
@@ -18,18 +18,26 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 import com.google.gson.GsonBuilder;
 import com.google.gson.TypeAdapter;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * A schema used to write and read data files
@@ -108,4 +116,63 @@ public class Schema {
     });
     return this;
   }
+
+  public List<String> prepareSortColumns(Map<String, String> properties)
+      throws MalformedCarbonCommandException {
+
+    List<String> sortColumnsList = new ArrayList<>();
+    Set<Map.Entry<String, String>> entries = properties.entrySet();
+    String sortKeyString = null;
+    for (Map.Entry<String, String> entry : entries) {
+      if (CarbonCommonConstants.SORT_COLUMNS.equalsIgnoreCase(entry.getKey())) {
+        sortKeyString = CarbonUtil.unquoteChar(entry.getValue()).trim();
+      }
+    }
+
+    if (sortKeyString != null) {
+      String[] sortKeys = sortKeyString.split(",", -1);
+      for (int i = 0; i < sortKeys.length; i++) {
+        sortKeys[i] = sortKeys[i].trim().toLowerCase();
+        if (StringUtils.isEmpty(sortKeys[i])) {
+          throw new MalformedCarbonCommandException("SORT_COLUMNS contains illegal argument.");
+        }
+      }
+
+      for (int i = sortKeys.length - 2; i >= 0; i--) {
+        for (int j = i + 1; j < sortKeys.length; j++) {
+          if (sortKeys[i].equals(sortKeys[j])) {
+            throw new MalformedCarbonCommandException(
+                "SORT_COLUMNS Either having duplicate columns : " + sortKeys[i]);
+          }
+        }
+      }
+
+      for (int i = sortKeys.length - 1; i >= 0; i--) {
+        boolean isExists = false;
+        for (int j = fields.length - 1; j >= 0; j--) {
+          if (sortKeys[i].equalsIgnoreCase(fields[j].getFieldName())) {
+            sortKeys[i] = fields[j].getFieldName();
+            isExists = true;
+            break;
+          }
+        }
+        if (!isExists) {
+          String message = "sort_columns: " + sortKeys[i]
+              + " does not exist in table. Please check create table statement.";
+          throw new MalformedCarbonCommandException(message);
+        }
+      }
+      sortColumnsList = Arrays.asList(sortKeys);
+    } else {
+      for (Field field : fields) {
+        if (null != field) {
+          if (field.getDataType() == DataTypes.STRING || field.getDataType() == DataTypes.DATE
+              || field.getDataType() == DataTypes.TIMESTAMP) {
+            sortColumnsList.add(field.getFieldName());
+          }
+        }
+      }
+    }
+    return sortColumnsList;
+  }
 }


Mime
View raw message