carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] carbondata git commit: Fixes include: 1. If a numeric data type column is specified as sort column and if it contains non numeric value then data load fails. 2. Intermediate merger thread is not shutdown before starting final merge sort during alte
Date Thu, 18 May 2017 07:41:52 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 608fb3a26 -> 08badd025


Fixes include:
1. If a numeric data type column is specified as sort column and if it contains non numeric
value then data load fails.
2. Intermediate merger thread is not shutdown before starting final merge sort during alter
table compaction. This can lead to file not found exception.
3. During compaction after restructure operation, number of sort columns and number of no
dictionary sort columns are not getting population during sorting which will lead to unsorted
data being written to file and sometimes failure of compaction after restructure operation.

Fix:
1. In case when bad records action is force and given value is invalid for a datatype update
the row with default null value array.
2. Call finish on intermediate merger thread before starting final merge sortBufferSize
3. Populate the number of sort and no sort columns from carbon table during compaction after
restructure operation.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/89ddf5a1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/89ddf5a1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/89ddf5a1

Branch: refs/heads/master
Commit: 89ddf5a122eb1069f918c264a119f39d7adb7077
Parents: 608fb3a
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Wed May 17 20:49:32 2017 +0530
Committer: Venkata Ramana Gollamudi <g.ramana.v1@gmail.com>
Committed: Thu May 18 12:22:18 2017 +0530

----------------------------------------------------------------------
 .../resources/numeric_column_invalid_values.csv |  6 ++++
 .../src/test/resources/restructure/data7.csv    |  2 ++
 .../testsuite/sortcolumns/TestSortColumns.scala | 15 +++++++-
 .../vectorreader/AddColumnTestCases.scala       | 26 ++++++++++++++
 .../merger/CompactionResultSortProcessor.java   | 15 +++++---
 .../impl/NonDictionaryFieldConverterImpl.java   |  3 +-
 .../sortandgroupby/sortdata/SortParameters.java | 38 +++++++++++++-------
 7 files changed, 86 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/integration/spark-common-test/src/test/resources/numeric_column_invalid_values.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/numeric_column_invalid_values.csv
b/integration/spark-common-test/src/test/resources/numeric_column_invalid_values.csv
new file mode 100644
index 0000000..ec8825d
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/numeric_column_invalid_values.csv
@@ -0,0 +1,6 @@
+1,Pallavi,25
+2,Rahul,24
+3,Prabhat,twenty six
+7,Neha,25
+2,Geetika,22
+3,Sangeeta,26
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/integration/spark-common-test/src/test/resources/restructure/data7.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/restructure/data7.csv b/integration/spark-common-test/src/test/resources/restructure/data7.csv
new file mode 100644
index 0000000..e0b35d0
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/restructure/data7.csv
@@ -0,0 +1,2 @@
+spark1,abc
+spark2,pqr
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index afdca21..b417a0c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -233,6 +233,17 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
       defaultLoadingProperties
     }
   }
+
+  test("create table with invalid values for numeric data type columns specified as sort_columns")
{
+    // load hive data
+    sql("CREATE TABLE test_sort_col_hive (id INT, name STRING, age INT) row format delimited
fields terminated by ','")
+    sql(s"LOAD DATA local inpath '$resourcesPath/numeric_column_invalid_values.csv' INTO
TABLE test_sort_col_hive")
+    // load carbon data
+    sql("CREATE TABLE test_sort_col (id INT, name STRING, age INT) STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('SORT_COLUMNS'='id,age')")
+    sql(s"LOAD DATA local inpath '$resourcesPath/numeric_column_invalid_values.csv' INTO
TABLE test_sort_col OPTIONS('FILEHEADER'='id,name,age')")
+    // compare hive and carbon data
+    checkAnswer(sql("select * from test_sort_col_hive"), sql("select * from test_sort_col"))
+  }
   
   override def afterAll = {
     dropTable
@@ -258,6 +269,8 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists unsortedtable_heap_safe")
     sql("drop table if exists unsortedtable_heap_unsafe")
     sql("drop table if exists unsortedtable_heap_inmemory")
+    sql("drop table if exists test_sort_col")
+    sql("drop table if exists test_sort_col_hive")
   }
 
   def setLoadingProperties(offheap: String, unsafe: String, useBatch: String): Unit = {
@@ -267,7 +280,7 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
   }
 
   def defaultLoadingProperties = {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
CarbonCommonConstants.ENABLE_OFFHEAP_SORT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 2cec9a5..60c0412 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -331,6 +331,32 @@ class AddColumnTestCases extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon_table")
   }
 
+  test("test compaction with all dictionary columns") {
+    sql("DROP TABLE IF EXISTS alter_dict")
+    sql("CREATE TABLE alter_dict(stringField string,charField string) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data7.csv' INTO TABLE alter_dict
options('FILEHEADER'='stringField,charField')")
+    sql("Alter table alter_dict drop columns(charField)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data7.csv' INTO TABLE alter_dict
options('FILEHEADER'='stringField')")
+    sql("Alter table alter_dict compact 'major'")
+    checkExistence(sql("show segments for table alter_dict"), true, "0Compacted")
+    checkExistence(sql("show segments for table alter_dict"), true, "1Compacted")
+    checkExistence(sql("show segments for table alter_dict"), true, "0.1Success")
+    sql("DROP TABLE IF EXISTS alter_dict")
+  }
+
+  test("test compaction with all no dictionary columns") {
+    sql("DROP TABLE IF EXISTS alter_no_dict")
+    sql("CREATE TABLE alter_no_dict(stringField string,charField string) STORED BY 'carbondata'
TBLPROPERTIES('DICTIONARY_EXCLUDE'='stringField,charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data7.csv' INTO TABLE alter_no_dict
options('FILEHEADER'='stringField,charField')")
+    sql("Alter table alter_no_dict drop columns(charField)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data7.csv' INTO TABLE alter_no_dict
options('FILEHEADER'='stringField')")
+    sql("Alter table alter_no_dict compact 'major'")
+    checkExistence(sql("show segments for table alter_no_dict"), true, "0Compacted")
+    checkExistence(sql("show segments for table alter_no_dict"), true, "1Compacted")
+    checkExistence(sql("show segments for table alter_no_dict"), true, "0.1Success")
+    sql("DROP TABLE IF EXISTS alter_no_dict")
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS addcolumntest")
     sql("drop table if exists hivetable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 690f6ef..3c71274 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -121,6 +121,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
    * whether the allocated tasks has any record
    */
   private boolean isRecordFound;
+  /**
+   * intermediate sort merger
+   */
+  private SortIntermediateFileMerger intermediateFileMerger;
 
   /**
    * @param carbonLoadModel
@@ -266,6 +270,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
    */
   private void readAndLoadDataFromSortTempFiles() throws Exception {
     try {
+      intermediateFileMerger.finish();
       finalMerger.startFinalMerge();
       while (finalMerger.hasNext()) {
         Object[] rowRead = finalMerger.next();
@@ -328,7 +333,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
     }
     dimensionColumnCount = dimensions.size();
     SortParameters parameters = createSortParameters();
-    SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters);
+    intermediateFileMerger = new SortIntermediateFileMerger(parameters);
     // TODO: Now it is only supported onheap merge, but we can have unsafe merge
     // as well by using UnsafeSortDataRows.
     this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
@@ -348,10 +353,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
    */
   private SortParameters createSortParameters() {
     return SortParameters
-        .createSortParameters(carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount,
-            segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount,
-            carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(),
-            noDictionaryColMapping, true);
+        .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
+            dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
+            noDictionaryCount, carbonLoadModel.getPartitionId(), segmentId,
+            carbonLoadModel.getTaskNo(), noDictionaryColMapping, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index 3accb0b..b39ef11 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -55,11 +55,12 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter
{
         row.update(DataTypeUtil
             .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType), index);
       } catch (Throwable ex) {
-        if (dimensionValue.length() != 0 || isEmptyBadRecord) {
+        if (dimensionValue.length() > 0 || isEmptyBadRecord) {
           logHolder.setReason(
               "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
                   .getColName() + " and column data type " + dataType + " is not a valid
"
                   + dataType + " type.");
+          row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
         } else {
           row.update(new byte[0], index);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89ddf5a1/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 8ac1491..29ca69c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.schema.metadata.SortObserver;
@@ -370,14 +371,7 @@ public class SortParameters {
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
     parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
-    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length)
{
-      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
-    } else {
-      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
-      System.arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0,
-          noDictionarySortColumnTemp, 0, parameters.getNumberOfSortColumns());
-      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
-    }
+    setNoDictionarySortColumnMapping(parameters);
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties
@@ -465,10 +459,27 @@ public class SortParameters {
     return parameters;
   }
 
-  public static SortParameters createSortParameters(String databaseName, String tableName,
-      int dimColCount, int complexDimColCount, int measureColCount, int noDictionaryCount,
-      String partitionID, String segmentId, String taskNo, boolean[] noDictionaryColMaping,
-      boolean isCompactionFlow) {
+  /**
+   * this method will set the boolean mapping for no dictionary sort columns
+   *
+   * @param parameters
+   */
+  private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
+    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length)
{
+      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
+    } else {
+      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
+      System
+          .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp,
0,
+              parameters.getNumberOfSortColumns());
+      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
+    }
+  }
+
+  public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
+      String tableName, int dimColCount, int complexDimColCount, int measureColCount,
+      int noDictionaryCount, String partitionID, String segmentId, String taskNo,
+      boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);
@@ -478,7 +489,9 @@ public class SortParameters {
     parameters.setTaskNo(taskNo);
     parameters.setMeasureColCount(measureColCount);
     parameters.setDimColCount(dimColCount - complexDimColCount);
+    parameters.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     parameters.setNoDictionaryCount(noDictionaryCount);
+    parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     parameters.setComplexDimColCount(complexDimColCount);
     parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
     parameters.setObserver(new SortObserver());
@@ -565,6 +578,7 @@ public class SortParameters {
         .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
             parameters.getTableName());
     parameters.setMeasureDataType(type);
+    setNoDictionarySortColumnMapping(parameters);
     return parameters;
   }
 


Mime
View raw message