carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [36/50] [abbrv] carbondata git commit: fix unsafe column page bug
Date Wed, 05 Jul 2017 00:44:45 GMT
fix unsafe column page bug


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fdb672ad
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fdb672ad
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fdb672ad

Branch: refs/heads/streaming_ingest
Commit: fdb672ad946c0fe5b9982aee9b09717db36a54f7
Parents: ad80006
Author: jackylk <jacky.likun@huawei.com>
Authored: Fri Jun 30 18:27:08 2017 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Sat Jul 1 13:09:24 2017 +0800

----------------------------------------------------------------------
 .../page/UnsafeVarLengthColumnPage.java         | 35 ++++++++++++++++----
 .../datastore/page/VarLengthColumnPageBase.java |  3 +-
 .../resources/big_decimal_without_header.csv    |  5 +++
 .../TestLoadDataWithHiveSyntaxUnsafe.scala      | 25 +++++++++++++-
 4 files changed, 59 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 75b5312..dd6abc5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -47,6 +47,11 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase
{
 
   private static final double FACTOR = 1.25;
 
+  /**
+   * create a page
+   * @param dataType data type
+   * @param pageSize number of row
+   */
   UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
     super(dataType, pageSize);
     capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
@@ -55,6 +60,20 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase
{
     baseOffset = memoryBlock.getBaseOffset();
   }
 
+  /**
+   * create a page with initial capacity
+   * @param dataType data type
+   * @param pageSize number of row
+   * @param capacity initial capacity of the page, in bytes
+   */
+  UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException
{
+    super(dataType, pageSize);
+    this.capacity = capacity;
+    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+    baseAddress = memoryBlock.getBaseObject();
+    baseOffset = memoryBlock.getBaseOffset();
+  }
+
   @Override
   public void freeMemory() {
     if (memoryBlock != null) {
@@ -65,6 +84,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
     }
   }
 
+  /**
+   * reallocate memory if capacity length than current size + request size
+   */
   private void ensureMemory(int requestSize) throws MemoryException {
     if (totalLength + requestSize > capacity) {
       int newSize = 2 * capacity;
@@ -81,17 +103,16 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase
{
 
   @Override
   public void putBytesAtRow(int rowId, byte[] bytes) {
-    try {
-      ensureMemory(bytes.length);
-    } catch (MemoryException e) {
-      throw new RuntimeException(e);
-    }
-    CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-        baseAddress, baseOffset + rowOffset[rowId], bytes.length);
+    putBytes(rowId, bytes, 0, bytes.length);
   }
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    try {
+      ensureMemory(length);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset,
         baseAddress, baseOffset + rowOffset[rowId], length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index a897d54..801cfb3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -105,8 +105,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     int numRows = rowId;
 
     VarLengthColumnPageBase page;
+    int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(DECIMAL, numRows);
+      page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength);
     } else {
       page = new SafeVarLengthColumnPage(DECIMAL, numRows);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
b/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
new file mode 100644
index 0000000..4e99384
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
@@ -0,0 +1,5 @@
+1,32473289848372638424.8218378712
+2,99487323423232324232.2434323233
+3,12773443434389239382.4309238238
+4,38488747823423323726.3589238237
+5,93838663748166353423.4273832762
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
index 2a9d1d9..c713865 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
@@ -65,6 +65,8 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
     sql("drop table if exists comment_test")
     sql("drop table if exists smallinttable")
     sql("drop table if exists smallinthivetable")
+    sql("drop table if exists decimal_varlength")
+    sql("drop table if exists decimal_varlength_hive")
     sql(
       "CREATE table carbontable (empno int, empname String, designation String, doj String,
" +
           "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
" +
@@ -77,7 +79,18 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
           "projectcode int, projectjoindate String,projectenddate String, attendance String,"
+
           "utilization String,salary String)row format delimited fields terminated by ','"
     )
-
+    sql(
+      """
+        | CREATE TABLE decimal_varlength(id string, value decimal(30,10))
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin
+    )
+    sql(
+      """
+        | CREATE TABLE decimal_varlength_hive(id string, value decimal(30,10))
+        | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+      """.stripMargin
+    )
   }
 
   test("create table with smallint type and query smallint table") {
@@ -674,6 +687,14 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
       Row("~carbon,")))
   }
 
+  test("test decimal var lenght comlumn page") {
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/big_decimal_without_header.csv' INTO TABLE
decimal_varlength" +
+        s" OPTIONS('FILEHEADER'='id,value')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/big_decimal_without_header.csv' INTO TABLE
decimal_varlength_hive")
+    checkAnswer(sql("select value from decimal_varlength"), sql("select value from decimal_varlength_hive"))
+    checkAnswer(sql("select sum(value) from decimal_varlength"), sql("select sum(value) from
decimal_varlength_hive"))
+  }
+
   override def afterAll {
     sql("drop table if exists escapechar1")
     sql("drop table if exists escapechar2")
@@ -701,6 +722,8 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
     sql("drop table if exists carbontable1")
     sql("drop table if exists hivetable1")
     sql("drop table if exists comment_test")
+    sql("drop table if exists decimal_varlength")
+    sql("drop table if exists decimal_varlength_hive")
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
       CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT


Mime
View raw message