carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject [1/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types
Date Tue, 18 Sep 2018 13:38:15 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 61fcdf286 -> c8f706304


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index d3d538a..c4416d5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.File;
 import java.io.Serializable;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -88,6 +89,17 @@ public class SortParameters implements Serializable {
 
   private DataType[] measureDataType;
 
+  // no dictionary data types of the table
+  private DataType[] noDictDataType;
+
+  // no dictionary columns data types participating in sort
+  // used while writing the row to sort temp file where sort no dict columns are handled
seperately
+  private DataType[] noDictSortDataType;
+
+  // no dictionary columns data types not participating in sort
+  // used while writing the row to sort temp file where nosort nodict columns are handled
seperately
+  private DataType[] noDictNoSortDataType;
+
   /**
    * To know how many columns are of high cardinality.
    */
@@ -111,6 +123,8 @@ public class SortParameters implements Serializable {
   private boolean[] noDictionaryDimnesionColumn;
 
   private boolean[] noDictionarySortColumn;
+
+  private boolean[] sortColumn;
   /**
    * whether dimension is varchar data type.
    * since all dimensions are string, we use an array of boolean instead of datatypes
@@ -142,11 +156,15 @@ public class SortParameters implements Serializable {
     parameters.databaseName = databaseName;
     parameters.tableName = tableName;
     parameters.measureDataType = measureDataType;
+    parameters.noDictDataType = noDictDataType;
+    parameters.noDictSortDataType = noDictSortDataType;
+    parameters.noDictNoSortDataType = noDictNoSortDataType;
     parameters.noDictionaryCount = noDictionaryCount;
     parameters.partitionID = partitionID;
     parameters.segmentId = segmentId;
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.sortColumn = sortColumn;
     parameters.isVarcharDimensionColumn = isVarcharDimensionColumn;
     parameters.noDictionarySortColumn = noDictionarySortColumn;
     parameters.numberOfSortColumns = numberOfSortColumns;
@@ -382,7 +400,10 @@ public class SortParameters implements Serializable {
 
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
     parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
-    setNoDictionarySortColumnMapping(parameters);
+    parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+        .getNoDictSortColMapping(configuration.getTableIdentifier().getDatabaseName(),
+            configuration.getTableIdentifier().getTableName()));
+    parameters.setSortColumn(configuration.getSortColumnMapping());
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties
@@ -431,6 +452,14 @@ public class SortParameters implements Serializable {
 
     DataType[] measureDataType = configuration.getMeasureDataType();
     parameters.setMeasureDataType(measureDataType);
+    parameters.setNoDictDataType(CarbonDataProcessorUtil
+        .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+            configuration.getTableIdentifier().getTableName()));
+    Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
+        .getNoDictSortAndNoSortDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+            configuration.getTableIdentifier().getTableName());
+    parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+    parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     return parameters;
   }
 
@@ -442,28 +471,10 @@ public class SortParameters implements Serializable {
     this.rangeId = rangeId;
   }
 
-  /**
-   * this method will set the boolean mapping for no dictionary sort columns
-   *
-   * @param parameters
-   */
-  private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
-    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length)
{
-      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
-    } else {
-      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
-      System
-          .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp,
0,
-              parameters.getNumberOfSortColumns());
-      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
-    }
-  }
-
   public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
       String tableName, int dimColCount, int complexDimColCount, int measureColCount,
-      int noDictionaryCount, String segmentId, String taskNo,
-      boolean[] noDictionaryColMaping, boolean[] isVarcharDimensionColumn,
-      boolean isCompactionFlow) {
+      int noDictionaryCount, String segmentId, String taskNo, boolean[] noDictionaryColMaping,
+      boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow)
{
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);
@@ -478,6 +489,7 @@ public class SortParameters implements Serializable {
     parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     parameters.setComplexDimColCount(complexDimColCount);
     parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+    parameters.setSortColumn(sortColumnMapping);
     parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn);
     parameters.setObserver(new SortObserver());
     // get sort buffer size
@@ -523,8 +535,46 @@ public class SortParameters implements Serializable {
         .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
             parameters.getTableName());
     parameters.setMeasureDataType(type);
-    setNoDictionarySortColumnMapping(parameters);
+    parameters.setNoDictDataType(CarbonDataProcessorUtil
+        .getNoDictDataTypes(parameters.getDatabaseName(), parameters.getTableName()));
+    Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
+        .getNoDictSortAndNoSortDataTypes(parameters.getDatabaseName(), parameters.getTableName());
+    parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+    parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+    parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+        .getNoDictSortColMapping(parameters.getDatabaseName(), parameters.getTableName()));
     return parameters;
   }
 
+  public DataType[] getNoDictSortDataType() {
+    return noDictSortDataType;
+  }
+
+  public void setNoDictSortDataType(DataType[] noDictSortDataType) {
+    this.noDictSortDataType = noDictSortDataType;
+  }
+
+  public DataType[] getNoDictNoSortDataType() {
+    return noDictNoSortDataType;
+  }
+
+  public DataType[] getNoDictDataType() {
+    return noDictDataType;
+  }
+
+  public void setNoDictNoSortDataType(DataType[] noDictNoSortDataType) {
+    this.noDictNoSortDataType = noDictNoSortDataType;
+  }
+
+  public void setNoDictDataType(DataType[] noDictDataType) {
+    this.noDictDataType = noDictDataType;
+  }
+
+  public boolean[] getSortColumn() {
+    return sortColumn;
+  }
+
+  public void setSortColumn(boolean[] sortColumn) {
+    this.sortColumn = sortColumn;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index e39fe1d..a1ef04e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -115,7 +115,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.tableFieldStat = new TableFieldStat(sortParameters);
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.comparator = new IntermediateSortTempRowComparator(
-        tableFieldStat.getIsSortColNoDictFlags());
+        tableFieldStat.getIsSortColNoDictFlags(), tableFieldStat.getNoDictDataType());
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
     this.convertToActualField = convertToActualField;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 353ddb4..e9ed6f3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -44,6 +44,9 @@ public class TableFieldStat implements Serializable {
   private boolean[] isVarcharDimFlags;
   private int measureCnt;
   private DataType[] measureDataType;
+  private DataType[] noDictDataType;
+  private DataType[] noDictSortDataType;
+  private DataType[] noDictNoSortDataType;
 
   // indices for dict & sort dimension columns
   private int[] dictSortDimIdx;
@@ -66,17 +69,20 @@ public class TableFieldStat implements Serializable {
     this.complexDimCnt = sortParameters.getComplexDimColCount();
     this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
     this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
-    int sortColCnt = isSortColNoDictFlags.length;
-    for (boolean flag : isSortColNoDictFlags) {
-      if (flag) {
+    boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
+    boolean[] sortColumn = sortParameters.getSortColumn();
+    for (int i = 0; i < isDimNoDictFlags.length; i++) {
+      if (isDimNoDictFlags[i] && sortColumn[i]) {
         noDictSortDimCnt++;
-      } else {
+      } else if (!isDimNoDictFlags[i] && sortColumn[i]) {
         dictSortDimCnt++;
       }
     }
     this.measureCnt = sortParameters.getMeasureColCount();
     this.measureDataType = sortParameters.getMeasureDataType();
-
+    this.noDictDataType = sortParameters.getNoDictDataType();
+    this.noDictSortDataType = sortParameters.getNoDictSortDataType();
+    this.noDictNoSortDataType = sortParameters.getNoDictNoSortDataType();
     for (boolean flag : isVarcharDimFlags) {
       if (flag) {
         varcharDimCnt++;
@@ -97,19 +103,18 @@ public class TableFieldStat implements Serializable {
     int tmpDictSortCnt = 0;
     int tmpDictNoSortCnt = 0;
     int tmpVarcharCnt = 0;
-    boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
 
     for (int i = 0; i < isDimNoDictFlags.length; i++) {
       if (isDimNoDictFlags[i]) {
         if (isVarcharDimFlags[i]) {
           varcharDimIdx[tmpVarcharCnt++] = i;
-        } else if (i < sortColCnt && isSortColNoDictFlags[i]) {
+        } else if (sortColumn[i]) {
           noDictSortDimIdx[tmpNoDictSortCnt++] = i;
         } else {
           noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
         }
       } else {
-        if (i < sortColCnt && !isSortColNoDictFlags[i]) {
+        if (sortColumn[i]) {
           dictSortDimIdx[tmpDictSortCnt++] = i;
         } else {
           dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
@@ -217,4 +222,17 @@ public class TableFieldStat implements Serializable {
     return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
         noDictNoSortDimCnt, complexDimCnt, varcharDimCnt, measureCnt);
   }
+
+  public DataType[] getNoDictSortDataType() {
+    return noDictSortDataType;
+  }
+
+  public DataType[] getNoDictNoSortDataType() {
+    return noDictNoSortDataType;
+  }
+
+
+  public DataType[] getNoDictDataType() {
+    return noDictDataType;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 7151b47..c23b071 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 
@@ -240,9 +241,17 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler
{
    */
   private boolean isVarcharColumnFull(CarbonRow row) {
     if (model.getVarcharDimIdxInNoDict().size() > 0) {
-      byte[][] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+      Object[] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {
-        varcharColumnSizeInByte[i] += nonDictArray[model.getVarcharDimIdxInNoDict().get(i)].length;
+        if (DataTypeUtil
+            .isPrimitiveColumn(model.getNoDictAndComplexColumns()[i].getDataType())) {
+          // get the size from the data type
+          varcharColumnSizeInByte[i] +=
+              model.getNoDictAndComplexColumns()[i].getDataType().getSizeInBytes();
+        } else {
+          varcharColumnSizeInByte[i] +=
+              ((byte[]) nonDictArray[model.getVarcharDimIdxInNoDict().get(i)]).length;
+        }
         if (SnappyCompressor.MAX_BYTE_TO_COMPRESS -
                 (varcharColumnSizeInByte[i] + dataRows.size() * 4) < (2 << 20))
{
           LOGGER.info("Limited by varchar column, page size is " + dataRows.size());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 1a38de6..4b42bfc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -125,6 +125,12 @@ public class CarbonFactDataHandlerModel {
    * data type of all measures in the table
    */
   private DataType[] measureDataType;
+
+  /**
+   * no dictionary and complex columns in the table
+   */
+  private CarbonColumn[] noDictAndComplexColumns;
+
   /**
    * carbon data file attributes like task id, file stamp
    */
@@ -276,6 +282,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
     carbonFactDataHandlerModel.setMeasureDataType(configuration.getMeasureDataType());
+    carbonFactDataHandlerModel
+        .setNoDictAndComplexColumns(configuration.getNoDictAndComplexDimensions());
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
@@ -325,12 +333,20 @@ public class CarbonFactDataHandlerModel {
     List<CarbonDimension> allDimensions = carbonTable.getDimensions();
     int dictDimCount = allDimensions.size() - segmentProperties.getNumberOfNoDictionaryDimension()
             - segmentProperties.getComplexDimensions().size();
+    CarbonColumn[] noDicAndComplexColumns =
+        new CarbonColumn[segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
+            .getComplexDimensions().size()];
+    int noDicAndComp = 0;
     for (CarbonDimension dim : allDimensions) {
       if (!dim.isComplex() && !dim.hasEncoding(Encoding.DICTIONARY) &&
           dim.getDataType() == DataTypes.VARCHAR) {
         // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables()
         varcharDimIdxInNoDict.add(dim.getOrdinal() - dictDimCount);
       }
+      if (!dim.hasEncoding(Encoding.DICTIONARY)) {
+        noDicAndComplexColumns[noDicAndComp++] =
+            new CarbonColumn(dim.getColumnSchema(), dim.getOrdinal(), dim.getSchemaOrdinal());
+      }
     }
 
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
@@ -365,6 +381,7 @@ public class CarbonFactDataHandlerModel {
       measureDataTypes[i++] = msr.getDataType();
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
+    carbonFactDataHandlerModel.setNoDictAndComplexColumns(noDicAndComplexColumns);
     CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
@@ -713,5 +730,13 @@ public class CarbonFactDataHandlerModel {
   public void setColumnCompressor(String columnCompressor) {
     this.columnCompressor = columnCompressor;
   }
+
+  public CarbonColumn[] getNoDictAndComplexColumns() {
+    return noDictAndComplexColumns;
+  }
+
+  public void setNoDictAndComplexColumns(CarbonColumn[] noDictAndComplexColumns) {
+    this.noDictAndComplexColumns = noDictAndComplexColumns;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index a311483..2f49ef2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
@@ -60,6 +62,9 @@ import org.apache.carbondata.processing.datatypes.GenericDataType;
  */
 public class TablePage {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TablePage.class.getName());
+
   // For all dimension and measure columns, we store the column data directly in the page,
   // the length of the page is the number of rows.
 
@@ -125,10 +130,24 @@ public class TablePage {
           page = ColumnPage.newLocalDictPage(
               columnPageEncoderMeta, pageSize, localDictionaryGenerator, false);
         } else {
-          page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
+          if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
+            columnPageEncoderMeta =
+                new ColumnPageEncoderMeta(spec, spec.getSchemaDataType(), columnCompressor);
+            // create the column page according to the data type for no dictionary numeric
columns
+            if (DataTypes.isDecimal(spec.getSchemaDataType())) {
+              page = ColumnPage.newDecimalPage(columnPageEncoderMeta, pageSize);
+            } else {
+              page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
+            }
+          } else {
+            page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
+          }
         }
+        // set the stats collector according to the data type of the columns
         if (DataTypes.VARCHAR == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
+        } else if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
+          page.setStatsCollector(PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType()));
         } else {
           page.setStatsCollector(LVShortStringStatsCollector.newInstance());
         }
@@ -194,22 +213,35 @@ public class TablePage {
     int complexColumnCount = complexDimensionPages.length;
     if (noDictionaryCount > 0 || complexColumnCount > 0) {
       TableSpec tableSpec = model.getTableSpec();
-      byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       List<TableSpec.DimensionSpec> noDictionaryDimensionSpec =
           tableSpec.getNoDictionaryDimensionSpec();
+      Object[] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictAndComplex.length; i++) {
         if (noDictionaryDimensionSpec.get(i).getSchemaDataType()
             == DataTypes.VARCHAR) {
-          byte[] valueWithLength = addIntLengthToByteArray(noDictAndComplex[i]);
+          byte[] valueWithLength = addIntLengthToByteArray((byte[]) noDictAndComplex[i]);
           noDictDimensionPages[i].putData(rowId, valueWithLength);
         } else if (i < noDictionaryCount) {
-          // noDictionary columns, since it is variable length, we need to prepare each
-          // element as LV result byte array (first two bytes are the length of the array)
-          byte[] valueWithLength = addShortLengthToByteArray(noDictAndComplex[i]);
-          noDictDimensionPages[i].putData(rowId, valueWithLength);
+          if (DataTypeUtil
+              .isPrimitiveColumn(noDictDimensionPages[i].getColumnSpec().getSchemaDataType()))
{
+            // put the actual data to the row
+            Object value = noDictAndComplex[i];
+            // in compaction flow the measure with decimal type will come as Spark decimal.
+            // need to convert it to byte array.
+            if (DataTypes.isDecimal(noDictDimensionPages[i].getDataType()) && model
+                .isCompactionFlow() && value != null) {
+              value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value);
+            }
+            noDictDimensionPages[i].putData(rowId, value);
+          } else {
+            // noDictionary columns, since it is variable length, we need to prepare each
+            // element as LV result byte array (first two bytes are the length of the array)
+            byte[] valueWithLength = addShortLengthToByteArray((byte[]) noDictAndComplex[i]);
+            noDictDimensionPages[i].putData(rowId, valueWithLength);
+          }
         } else {
           // complex columns
-          addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
+          addComplexColumn(i - noDictionaryCount, rowId, (byte[]) noDictAndComplex[i]);
         }
       }
     }
@@ -373,7 +405,19 @@ public class TablePage {
           columnPageEncoder = encodingFactory.createEncoder(
               spec,
               noDictDimensionPages[noDictIndex]);
-          encodedPage = columnPageEncoder.encode(noDictDimensionPages[noDictIndex++]);
+          encodedPage = columnPageEncoder.encode(noDictDimensionPages[noDictIndex]);
+          if (LOGGER.isDebugEnabled()) {
+            DataType targetDataType =
+                columnPageEncoder.getTargetDataType(noDictDimensionPages[noDictIndex]);
+            if (null != targetDataType) {
+              LOGGER.debug(
+                  "Encoder result ---> Source data type: " + noDictDimensionPages[noDictIndex]
+                      .getDataType().getName() + " Destination data type: " + targetDataType
+                      .getName() + " for the column: " + noDictDimensionPages[noDictIndex]
+                      .getColumnSpec().getFieldName());
+            }
+          }
+          noDictIndex++;
           encodedDimensions.add(encodedPage);
           break;
         case COMPLEX:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index c2b21a6..3ba1e1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -426,12 +427,81 @@ public final class CarbonDataProcessorUtil {
     return type;
   }
 
-  public static DataType[] getMeasureDataType(int measureCount, DataField[] measureFields)
{
-    DataType[] type = new DataType[measureCount];
-    for (int i = 0; i < type.length; i++) {
-      type[i] = measureFields[i].getColumn().getDataType();
+  /**
+   * Get the no dictionary data types on the table
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static DataType[] getNoDictDataTypes(String databaseName, String tableName) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    List<DataType> type = new ArrayList<>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      if (!dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
+        type.add(dimensions.get(i).getDataType());
+      }
     }
-    return type;
+    return type.toArray(new DataType[type.size()]);
+  }
+
+  /**
+   * Get the no dictionary sort column mapping of the table
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static boolean[] getNoDictSortColMapping(String databaseName, String tableName)
{
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    List<Boolean> noDicSortColMap = new ArrayList<>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      if (!dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
+        if (dimensions.get(i).isSortColumn()) {
+          noDicSortColMap.add(true);
+        } else {
+          noDicSortColMap.add(false);
+        }
+      }
+    }
+    Boolean[] mapping = noDicSortColMap.toArray(new Boolean[noDicSortColMap.size()]);
+    boolean[] noDicSortColMapping = new boolean[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      noDicSortColMapping[i] = mapping[i].booleanValue();
+    }
+    return noDicSortColMapping;
+  }
+
+  /**
+   * Get the data types of the no dictionary sort columns
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(String databaseName,
+      String tableName) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    List<DataType> noDictSortType = new ArrayList<>();
+    List<DataType> noDictNoSortType = new ArrayList<>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      if (!dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
+        if (dimensions.get(i).isSortColumn()) {
+          noDictSortType.add(dimensions.get(i).getDataType());
+        } else {
+          noDictNoSortType.add(dimensions.get(i).getDataType());
+        }
+      }
+    }
+    DataType[] noDictSortTypes = noDictSortType.toArray(new DataType[noDictSortType.size()]);
+    DataType[] noDictNoSortTypes = noDictNoSortType.toArray(new DataType[noDictNoSortType.size()]);
+    Map<String, DataType[]> noDictSortAndNoSortTypes = new HashMap<>(2);
+    noDictSortAndNoSortTypes.put("noDictSortDataTypes", noDictSortTypes);
+    noDictSortAndNoSortTypes.put("noDictNoSortDataTypes", noDictNoSortTypes);
+    return noDictSortAndNoSortTypes;
   }
 
   /**


Mime
View raw message