carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [47/54] [abbrv] carbondata git commit: [CARBONDATA-1400] Fix bug of array column out of bound when writing carbondata file
Date Thu, 14 Sep 2017 09:20:40 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeWithBigArray.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeWithBigArray.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeWithBigArray.scala
new file mode 100644
index 0000000..f4fd168
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeWithBigArray.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.complexType
+
+import java.io.{File, FileOutputStream, PrintStream}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestComplexTypeWithBigArray extends QueryTest with BeforeAndAfterAll {
+
+  val filePath = "./list.csv"
+  val file = new File(filePath)
+
+  override def beforeAll: Unit = {
+    // write a CSV containing 32000 row, each row has an array with 10 elements
+    val out = new PrintStream(new FileOutputStream(file))
+    (1 to 33000).foreach(i=>out.println(s"$i,$i$$1"))
+    out.close()
+  }
+
+  test("test with big string array") {
+    sql("DROP TABLE IF EXISTS big_array")
+    sql(
+      """
+        | CREATE TABLE big_array(
+        |  value BIGINT,
+        |  list ARRAY<STRING>
+        |  )
+        | STORED BY 'carbondata'
+      """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${file.getAbsolutePath}'
+         | INTO TABLE big_array
+         | OPTIONS ('header'='false')
+      """.stripMargin)
+    checkAnswer(
+      sql("select count(*) from big_array"),
+      Row(33000)
+    )
+    checkAnswer(
+      sql("select * from big_array limit 1"),
+      Row(1, mutable.WrappedArray.make[String](Array("1", "1")))
+    )
+    checkAnswer(
+      sql("select list[1] from big_array limit 1"),
+      Row("1")
+    )
+    checkAnswer(
+      sql("select count(*) from big_array where list[0] = '1'"),
+      Row(1)
+    )
+    checkAnswer(
+      sql("select count(*) from big_array where array_contains(list, '1') "),
+      Row(33000)
+    )
+    if (sqlContext.sparkContext.version.startsWith("2.")) {
+      // explode UDF is supported start from spark 2.0
+      checkAnswer(
+        sql("select count(x) from (select explode(list) as x from big_array)"),
+        Row(66000)
+      )
+    }
+    checkAnswer(
+      sql("select * from big_array where value = 15000"),
+      Row(15000, mutable.WrappedArray.make[String](Array("15000", "1")))
+    )
+    checkAnswer(
+      sql("select * from big_array where value = 32500"),
+      Row(32500, mutable.WrappedArray.make[String](Array("32500", "1")))
+    )
+    checkAnswer(
+      sql("select count(list) from big_array"),
+      Row(33000)
+    )
+    sql("DROP TABLE big_array")
+  }
+
+  test("test with big int array") {
+    sql("DROP TABLE IF EXISTS big_array")
+    sql(
+      """
+        | CREATE TABLE big_array(
+        |  value BIGINT,
+        |  list ARRAY<INT>
+        |  )
+        | STORED BY 'carbondata'
+      """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${file.getAbsolutePath}'
+         | INTO TABLE big_array
+         | OPTIONS ('header'='false')
+      """.stripMargin)
+    checkAnswer(
+      sql("select count(*) from big_array"),
+      Row(33000)
+    )
+    checkAnswer(
+      sql("select * from big_array limit 1"),
+      Row(1, mutable.WrappedArray.make[String](Array(1, 1)))
+    )
+    checkAnswer(
+      sql("select list[1] from big_array limit 1"),
+      Row(1)
+    )
+    checkAnswer(
+      sql("select count(*) from big_array where list[0] = 1"),
+      Row(1)
+    )
+    checkAnswer(
+      sql("select count(*) from big_array where array_contains(list, 1) "),
+      Row(33000)
+    )
+    if (sqlContext.sparkContext.version.startsWith("2.")) {
+      // explode UDF is supported start from spark 2.0
+      checkAnswer(
+        sql("select count(x) from (select explode(list) as x from big_array)"),
+        Row(66000)
+      )
+    }
+    checkAnswer(
+      sql("select * from big_array where value = 15000"),
+      Row(15000, mutable.WrappedArray.make[Int](Array(15000, 1)))
+    )
+    checkAnswer(
+      sql("select * from big_array where value = 32500"),
+      Row(32500, mutable.WrappedArray.make[Int](Array(32500, 1)))
+    )
+    checkAnswer(
+      sql("select count(list) from big_array"),
+      Row(33000)
+    )
+    sql("DROP TABLE big_array")
+  }
+
+  override def afterAll: Unit = {
+    file.delete()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index bc09067..4d919dc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -42,8 +42,8 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
- * Carbon strategy for late decode (convert dictionary key to value as late as possible),
which
- * can improve the aggregation performance and reduce memory usage
+ * Carbon specific optimization for late decode (convert dictionary key to value as late
as
+ * possible), which can improve the aggregation performance and reduce memory usage
  */
 private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   val PUSHED_FILTERS = "PushedFilters"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 02ceb06..7661577 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -64,6 +64,13 @@ public class ArrayDataType implements GenericDataType<ArrayObject>
{
    */
   private int dataCounter;
 
+  private ArrayDataType(int outputArrayIndex, int dataCounter, GenericDataType children)
{
+    this.outputArrayIndex = outputArrayIndex;
+    this.dataCounter = dataCounter;
+    this.children = children;
+  }
+
+
   /**
    * constructor
    * @param name
@@ -270,4 +277,8 @@ public class ArrayDataType implements GenericDataType<ArrayObject>
{
     children.fillCardinalityAfterDataLoad(dimCardWithComplex, maxSurrogateKeyArray);
   }
 
+  @Override
+  public GenericDataType<ArrayObject> deepCopy() {
+    return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 6b54d2d..77c00d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -142,4 +142,8 @@ public interface GenericDataType<T> {
    */
   void fillCardinality(List<Integer> dimCardWithComplex);
 
+  /**
+   * clone self for multithread access (for complex type processing in table page)
+   */
+  GenericDataType<T> deepCopy();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index c6fc1c1..a9c2bfe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -95,6 +95,11 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
 
   private CarbonDimension carbonDimension;
 
+  private PrimitiveDataType(int outputArrayIndex, int dataCounter) {
+    this.outputArrayIndex = outputArrayIndex;
+    this.dataCounter = dataCounter;
+  }
+
   /**
    * constructor
    *
@@ -237,7 +242,8 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
   public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
       KeyGenerator[] generator) throws IOException, KeyGenException {
     int data = byteArrayInput.getInt();
-    dataOutputStream.write(generator[index].generateKey(new int[] { data }));
+    byte[] v = generator[index].generateKey(new int[] { data });
+    dataOutputStream.write(v);
   }
 
   /*
@@ -317,4 +323,12 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
       int[] maxSurrogateKeyArray) {
     dimCardWithComplex.add(maxSurrogateKeyArray[index]);
   }
+
+  @Override
+  public GenericDataType<Object> deepCopy() {
+    PrimitiveDataType dataType = new PrimitiveDataType(this.outputArrayIndex, 0);
+    dataType.setKeySize(this.keySize);
+    dataType.setSurrogateIndex(this.index);
+    return dataType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index a61144e..68b6911 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -58,6 +58,12 @@ public class StructDataType implements GenericDataType<StructObject>
{
    */
   private int dataCounter;
 
+  private StructDataType(List<GenericDataType> children, int outputArrayIndex, int
dataCounter) {
+    this.children = children;
+    this.outputArrayIndex = outputArrayIndex;
+    this.dataCounter = dataCounter;
+  }
+
   /**
    * constructor
    * @param name
@@ -296,4 +302,13 @@ public class StructDataType implements GenericDataType<StructObject>
{
       children.get(i).fillCardinalityAfterDataLoad(dimCardWithComplex, maxSurrogateKeyArray);
     }
   }
+
+  @Override
+  public GenericDataType<StructObject> deepCopy() {
+    List<GenericDataType> childrenClone = new ArrayList<>();
+    for (GenericDataType child : children) {
+      childrenClone.add(child.deepCopy());
+    }
+    return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index ab0a122..287de0a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -23,18 +23,20 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategyFactory;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.datastore.page.key.TablePageKey;
 import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.LVStringStatsCollector;
@@ -73,24 +75,30 @@ public class TablePage {
 
   private EncodedTablePage encodedTablePage;
 
-  private EncodingStrategy encodingStrategy = EncodingStrategyFactory.getStrategy();
+  private EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
 
   // true if it is last page of all input rows
   private boolean isLastPage;
 
+  // used for complex column to deserilize the byte array in input CarbonRow
+  private Map<Integer, GenericDataType> complexIndexMap = null;
+
   TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
     this.model = model;
     this.pageSize = pageSize;
     int numDictDimension = model.getMDKeyGenerator().getDimCount();
+    TableSpec tableSpec = model.getTableSpec();
     dictDimensionPages = new ColumnPage[numDictDimension];
     for (int i = 0; i < dictDimensionPages.length; i++) {
-      ColumnPage page = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
+      TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
+      ColumnPage page = ColumnPage.newPage(spec, DataType.BYTE_ARRAY, pageSize);
       page.setStatsCollector(KeyPageStatsCollector.newInstance(DataType.BYTE_ARRAY));
       dictDimensionPages[i] = page;
     }
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPages.length; i++) {
-      ColumnPage page = ColumnPage.newPage(DataType.STRING, pageSize);
+      TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i + numDictDimension);
+      ColumnPage page = ColumnPage.newPage(spec, DataType.STRING, pageSize);
       page.setStatsCollector(LVStringStatsCollector.newInstance());
       noDictDimensionPages[i] = page;
     }
@@ -105,11 +113,10 @@ public class TablePage {
     for (int i = 0; i < measurePages.length; i++) {
       TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i);
       ColumnPage page;
-      if (spec.getDataType() == DataType.DECIMAL) {
-        page = ColumnPage.newDecimalPage(dataTypes[i], pageSize,
-            spec.getScale(), spec.getPrecision());
+      if (spec.getSchemaDataType() == DataType.DECIMAL) {
+        page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize);
       } else {
-        page = ColumnPage.newPage(dataTypes[i], pageSize);
+        page = ColumnPage.newPage(spec, dataTypes[i], pageSize);
       }
       page.setStatsCollector(
           PrimitivePageStatsCollector.newInstance(
@@ -119,6 +126,13 @@ public class TablePage {
     boolean hasNoDictionary = noDictDimensionPages.length > 0;
     this.key = new TablePageKey(pageSize, model.getMDKeyGenerator(), model.getSegmentProperties(),
         hasNoDictionary);
+
+    // for complex type, `complexIndexMap` is used in multithread (in multiple Producer),
+    // we need to clone the index map to make it thread safe
+    this.complexIndexMap = new HashMap<>();
+    for (Map.Entry<Integer, GenericDataType> entry: model.getComplexIndexMap().entrySet())
{
+      this.complexIndexMap.put(entry.getKey(), entry.getValue().deepCopy());
+    }
   }
 
   /**
@@ -187,7 +201,7 @@ public class TablePage {
   // TODO: this function should be refactoried, ColumnPage should support complex type encoding
   // directly instead of doing it here
   private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
-    GenericDataType complexDataType = model.getComplexIndexMap().get(
+    GenericDataType complexDataType = complexIndexMap.get(
         index + model.getPrimitiveDimLens().length);
 
     // initialize the page if first row
@@ -265,7 +279,7 @@ public class TablePage {
       throws MemoryException, IOException {
     EncodedColumnPage[] encodedMeasures = new EncodedColumnPage[measurePages.length];
     for (int i = 0; i < measurePages.length; i++) {
-      ColumnPageEncoder encoder = encodingStrategy.createEncoder(
+      ColumnPageEncoder encoder = encodingFactory.createEncoder(
           model.getTableSpec().getMeasureSpec(i), measurePages[i]);
       encodedMeasures[i] = encoder.encode(measurePages[i]);
     }
@@ -286,17 +300,17 @@ public class TablePage {
       ColumnPageEncoder columnPageEncoder;
       EncodedColumnPage encodedPage;
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
-      switch (spec.getDimensionType()) {
+      switch (spec.getColumnType()) {
         case GLOBAL_DICTIONARY:
         case DIRECT_DICTIONARY:
-          columnPageEncoder = encodingStrategy.createEncoder(
+          columnPageEncoder = encodingFactory.createEncoder(
               spec,
               dictDimensionPages[dictIndex]);
           encodedPage = columnPageEncoder.encode(dictDimensionPages[dictIndex++]);
           encodedDimensions.add(encodedPage);
           break;
         case PLAIN_VALUE:
-          columnPageEncoder = encodingStrategy.createEncoder(
+          columnPageEncoder = encodingFactory.createEncoder(
               spec,
               noDictDimensionPages[noDictIndex]);
           encodedPage = columnPageEncoder.encode(noDictDimensionPages[noDictIndex++]);
@@ -309,7 +323,7 @@ public class TablePage {
           break;
         default:
           throw new IllegalArgumentException("unsupported dimension type:" + spec
-              .getDimensionType());
+              .getColumnType());
       }
     }
 
@@ -327,10 +341,10 @@ public class TablePage {
     TableSpec spec = model.getTableSpec();
     int numDimensions = spec.getNumDimensions();
     for (int i = 0; i < numDimensions; i++) {
-      DimensionType type = spec.getDimensionSpec(i).getDimensionType();
-      if ((type == DimensionType.GLOBAL_DICTIONARY) || (type == DimensionType.DIRECT_DICTIONARY))
{
+      ColumnType type = spec.getDimensionSpec(i).getColumnType();
+      if ((type == ColumnType.GLOBAL_DICTIONARY) || (type == ColumnType.DIRECT_DICTIONARY))
{
         page = dictDimensionPages[++dictDimensionIndex];
-      } else if (type == DimensionType.PLAIN_VALUE) {
+      } else if (type == ColumnType.PLAIN_VALUE) {
         page = noDictDimensionPages[++noDictDimensionIndex];
       } else {
         // do not support datamap on complex column

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index e91cf44..fabb5a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
-import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -585,8 +585,8 @@ public final class CarbonDataProcessorUtil {
    * @param dimensionType
    * @return
    */
-  public static boolean isRleApplicableForColumn(DimensionType dimensionType) {
-    if (dimensionType == DimensionType.GLOBAL_DICTIONARY) {
+  public static boolean isRleApplicableForColumn(ColumnType dimensionType) {
+    if (dimensionType == ColumnType.GLOBAL_DICTIONARY) {
       return true;
     }
     return false;


Mime
View raw message