carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/3] incubator-carbondata git commit: Added Sort processor step for dataloading.
Date Tue, 25 Oct 2016 06:36:28 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master eb8528770 -> 6425299c7


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
new file mode 100644
index 0000000..e7bc252
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.sortandgroupby.sortdata;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.schema.metadata.SortObserver;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class SortParameters {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SortParameters.class.getName());
+  /**
+   * tempFileLocation
+   */
+  private String tempFileLocation;
+  /**
+   * sortBufferSize
+   */
+  private int sortBufferSize;
+  /**
+   * measure count
+   */
+  private int measureColCount;
+  /**
+   * measure count
+   */
+  private int dimColCount;
+  /**
+   * measure count
+   */
+  private int complexDimColCount;
+  /**
+   * fileBufferSize
+   */
+  private int fileBufferSize;
+  /**
+   * numberOfIntermediateFileToBeMerged
+   */
+  private int numberOfIntermediateFileToBeMerged;
+  /**
+   * fileWriteBufferSize
+   */
+  private int fileWriteBufferSize;
+  /**
+   * observer
+   */
+  private SortObserver observer;
+  /**
+   * sortTempFileNoOFRecordsInCompression
+   */
+  private int sortTempFileNoOFRecordsInCompression;
+  /**
+   * isSortTempFileCompressionEnabled
+   */
+  private boolean isSortFileCompressionEnabled;
+  /**
+   * prefetch
+   */
+  private boolean prefetch;
+  /**
+   * bufferSize
+   */
+  private int bufferSize;
+
+  private String databaseName;
+
+  private String tableName;
+
+  private char[] aggType;
+
+  /**
+   * To know how many columns are of high cardinality.
+   */
+  private int noDictionaryCount;
+  /**
+   * partitionID
+   */
+  private String partitionID;
+  /**
+   * Id of the load folder
+   */
+  private String segmentId;
+  /**
+   * task id, each spark task has a unique id
+   */
+  private String taskNo;
+
+  /**
+   * This will tell whether dimension is dictionary or not.
+   */
+  private boolean[] noDictionaryDimnesionColumn;
+
+  private int numberOfCores;
+
+  public String getTempFileLocation() {
+    return tempFileLocation;
+  }
+
+  public void setTempFileLocation(String tempFileLocation) {
+    this.tempFileLocation = tempFileLocation;
+  }
+
+  public int getSortBufferSize() {
+    return sortBufferSize;
+  }
+
+  public void setSortBufferSize(int sortBufferSize) {
+    this.sortBufferSize = sortBufferSize;
+  }
+
+  public int getMeasureColCount() {
+    return measureColCount;
+  }
+
+  public void setMeasureColCount(int measureColCount) {
+    this.measureColCount = measureColCount;
+  }
+
+  public int getDimColCount() {
+    return dimColCount;
+  }
+
+  public void setDimColCount(int dimColCount) {
+    this.dimColCount = dimColCount;
+  }
+
+  public int getComplexDimColCount() {
+    return complexDimColCount;
+  }
+
+  public void setComplexDimColCount(int complexDimColCount) {
+    this.complexDimColCount = complexDimColCount;
+  }
+
+  public int getFileBufferSize() {
+    return fileBufferSize;
+  }
+
+  public void setFileBufferSize(int fileBufferSize) {
+    this.fileBufferSize = fileBufferSize;
+  }
+
+  public int getNumberOfIntermediateFileToBeMerged() {
+    return numberOfIntermediateFileToBeMerged;
+  }
+
+  public void setNumberOfIntermediateFileToBeMerged(int numberOfIntermediateFileToBeMerged)
{
+    this.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+  }
+
+  public int getFileWriteBufferSize() {
+    return fileWriteBufferSize;
+  }
+
+  public void setFileWriteBufferSize(int fileWriteBufferSize) {
+    this.fileWriteBufferSize = fileWriteBufferSize;
+  }
+
+  public SortObserver getObserver() {
+    return observer;
+  }
+
+  public void setObserver(SortObserver observer) {
+    this.observer = observer;
+  }
+
+  public int getSortTempFileNoOFRecordsInCompression() {
+    return sortTempFileNoOFRecordsInCompression;
+  }
+
+  public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression)
{
+    this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+  }
+
+  public boolean isSortFileCompressionEnabled() {
+    return isSortFileCompressionEnabled;
+  }
+
+  public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) {
+    isSortFileCompressionEnabled = sortFileCompressionEnabled;
+  }
+
+  public boolean isPrefetch() {
+    return prefetch;
+  }
+
+  public void setPrefetch(boolean prefetch) {
+    this.prefetch = prefetch;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  public void setBufferSize(int bufferSize) {
+    this.bufferSize = bufferSize;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public char[] getAggType() {
+    return aggType;
+  }
+
+  public void setAggType(char[] aggType) {
+    this.aggType = aggType;
+  }
+
+  public int getNoDictionaryCount() {
+    return noDictionaryCount;
+  }
+
+  public void setNoDictionaryCount(int noDictionaryCount) {
+    this.noDictionaryCount = noDictionaryCount;
+  }
+
+  public String getPartitionID() {
+    return partitionID;
+  }
+
+  public void setPartitionID(String partitionID) {
+    this.partitionID = partitionID;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  public String getTaskNo() {
+    return taskNo;
+  }
+
+  public void setTaskNo(String taskNo) {
+    this.taskNo = taskNo;
+  }
+
+  public boolean[] getNoDictionaryDimnesionColumn() {
+    return noDictionaryDimnesionColumn;
+  }
+
+  public void setNoDictionaryDimnesionColumn(boolean[] noDictionaryDimnesionColumn) {
+    this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+  }
+
+  public int getNumberOfCores() {
+    return numberOfCores;
+  }
+
+  public void setNumberOfCores(int numberOfCores) {
+    this.numberOfCores = numberOfCores;
+  }
+
+  public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration)
{
+    SortParameters parameters = new SortParameters();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    parameters.setDatabaseName(tableIdentifier.getDatabaseName());
+    parameters.setTableName(tableIdentifier.getTableName());
+    parameters.setPartitionID(configuration.getPartitionId());
+    parameters.setSegmentId(configuration.getSegmentId());
+    parameters.setTaskNo(configuration.getTaskNo());
+    parameters.setMeasureColCount(configuration.getMeasureCount());
+    parameters.setDimColCount(
+        configuration.getDimensionCount() - configuration.getComplexDimensionCount());
+    parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
+    parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
+    parameters.setNoDictionaryDimnesionColumn(
+        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setObserver(new SortObserver());
+    // get sort buffer size
+    parameters.setSortBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_SIZE,
+            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
+    LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
+    // set number of intermedaite file to merge
+    parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+            CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
+
+    LOGGER.info("Number of intermediate file to be merged: " + parameters
+        .getNumberOfIntermediateFileToBeMerged());
+
+    // get file buffer size
+    parameters.setFileBufferSize(CarbonDataProcessorUtil
+        .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
+            CarbonCommonConstants.CONSTANT_SIZE_TEN));
+
+    LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
+
+    String carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), configuration.getTaskNo(),
+            configuration.getPartitionId(), configuration.getSegmentId(), false);
+    parameters.setTempFileLocation(
+        carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    LOGGER.info("temp file location" + parameters.getTempFileLocation());
+
+    int numberOfCores;
+    try {
+      numberOfCores = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+      numberOfCores = numberOfCores / 2;
+    } catch (NumberFormatException exc) {
+      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    parameters.setNumberOfCores(numberOfCores);
+
+    parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
+            CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
+
+    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
+
+    int sortTempFileNoOFRecordsInCompression;
+    try {
+      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ":Only Positive Integer value(greater than zero) is allowed.Default value will
"
+            + "be used");
+
+        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed. Default value will be used");
+
+      sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
+
+    if (parameters.isSortFileCompressionEnabled()) {
+      LOGGER.info("Compression will be used for writing the sort temp File");
+    }
+
+    parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
+    parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+
+    char[] aggType = new char[parameters.getMeasureColCount()];
+    Arrays.fill(aggType, 'n');
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+        parameters.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + parameters
+            .getTableName());
+    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(parameters.getTableName());
+    for (int i = 0; i < aggType.length; i++) {
+      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    }
+    parameters.setAggType(aggType);
+    return parameters;
+  }
+
+  public static SortParameters createSortParameters(String databaseName, String tableName,
+      int dimColCount, int complexDimColCount, int measureColCount, SortObserver observer,
+      int noDictionaryCount, String partitionID, String segmentId, String taskNo,
+      boolean[] noDictionaryColMaping) {
+    SortParameters parameters = new SortParameters();
+    CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    parameters.setDatabaseName(databaseName);
+    parameters.setTableName(tableName);
+    parameters.setPartitionID(partitionID);
+    parameters.setSegmentId(segmentId);
+    parameters.setTaskNo(taskNo);
+    parameters.setMeasureColCount(measureColCount);
+    parameters.setDimColCount(dimColCount - complexDimColCount);
+    parameters.setNoDictionaryCount(noDictionaryCount);
+    parameters.setComplexDimColCount(complexDimColCount);
+    parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+    parameters.setObserver(new SortObserver());
+    // get sort buffer size
+    parameters.setSortBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_SIZE,
+            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)));
+    LOGGER.info("Sort size for table: " + parameters.getSortBufferSize());
+    // set number of intermedaite file to merge
+    parameters.setNumberOfIntermediateFileToBeMerged(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+            CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)));
+
+    LOGGER.info("Number of intermediate file to be merged: " + parameters
+        .getNumberOfIntermediateFileToBeMerged());
+
+    // get file buffer size
+    parameters.setFileBufferSize(CarbonDataProcessorUtil
+        .getFileBufferSize(parameters.getNumberOfIntermediateFileToBeMerged(), carbonProperties,
+            CarbonCommonConstants.CONSTANT_SIZE_TEN));
+
+    LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
+
+    String carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
false);
+    parameters.setTempFileLocation(
+        carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    LOGGER.info("temp file location" + parameters.getTempFileLocation());
+
+    int numberOfCores;
+    try {
+      numberOfCores = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+      numberOfCores = numberOfCores / 2;
+    } catch (NumberFormatException exc) {
+      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    parameters.setNumberOfCores(numberOfCores);
+
+    parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
+            CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)));
+
+    parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties
+        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
+            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)));
+
+    int sortTempFileNoOFRecordsInCompression;
+    try {
+      sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
+              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
+      if (sortTempFileNoOFRecordsInCompression < 1) {
+        LOGGER.error("Invalid value for: "
+            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+            + ":Only Positive Integer value(greater than zero) is allowed.Default value will
"
+            + "be used");
+
+        sortTempFileNoOFRecordsInCompression = Integer.parseInt(
+            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error(
+          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+              + ", only Positive Integer value is allowed. Default value will be used");
+
+      sortTempFileNoOFRecordsInCompression = Integer
+          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
+    }
+    parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression);
+
+    if (parameters.isSortFileCompressionEnabled()) {
+      LOGGER.info("Compression will be used for writing the sort temp File");
+    }
+
+    parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
+    parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
+
+    char[] aggType = new char[parameters.getMeasureColCount()];
+    Arrays.fill(aggType, 'n');
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+        parameters.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + parameters
+            .getTableName());
+    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(parameters.getTableName());
+    for (int i = 0; i < aggType.length; i++) {
+      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    }
+    parameters.setAggType(aggType);
+    return parameters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
index cf7f64e..f74b8f7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
@@ -28,6 +28,8 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.schema.metadata.SortObserver;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 import org.apache.carbondata.processing.sortdatastep.SortKeyStepData;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
 
@@ -64,6 +66,11 @@ public class SortKeyStep extends BaseStep {
   private SortDataRows sortDataRows;
 
   /**
+   * intermediateFileMerger
+   */
+  private SortIntermediateFileMerger intermediateFileMerger;
+
+  /**
    * rowCounter
    */
   private long readCounter;
@@ -171,15 +178,16 @@ public class SortKeyStep extends BaseStep {
 
       this.noDictionaryColMaping =
           RemoveDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
-
-      this.sortDataRows = new SortDataRows(meta.getTabelName(),
-          meta.getDimensionCount() - meta.getComplexDimensionCount(),
-          meta.getComplexDimensionCount(), meta.getMeasureCount(), this.observer,
-          meta.getNoDictionaryCount(), meta.getPartitionID(), meta.getSegmentId() + "",
-          meta.getTaskNo(), this.noDictionaryColMaping);
+      SortParameters parameters =
+          SortParameters.createSortParameters(meta.getDatabaseName(), meta.getTabelName(),
+              meta.getDimensionCount(), meta.getComplexDimensionCount(), meta.getMeasureCount(),
+              this.observer, meta.getNoDictionaryCount(), meta.getPartitionID(),
+              meta.getSegmentId() + "", meta.getTaskNo(), this.noDictionaryColMaping);
+      intermediateFileMerger = new SortIntermediateFileMerger(parameters);
+      this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
       try {
         // initialize sort
-        this.sortDataRows.initialize(meta.getDatabaseName(), meta.getTableName());
+        this.sortDataRows.initialize();
       } catch (CarbonSortKeyAndGroupByException e) {
         throw new KettleException(e);
       }
@@ -228,6 +236,7 @@ public class SortKeyStep extends BaseStep {
     try {
       // start sorting
       this.sortDataRows.startSorting();
+      this.intermediateFileMerger.finish();
 
       // check any more rows are present
       LOGGER.info("Record Processed For table: " + meta.getTabelName());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 61cc711..f9c77fd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -37,7 +38,7 @@ import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChun
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-public class SingleThreadFinalSortFilesMerger {
+public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   /**
    * LOGGER
    */
@@ -221,7 +222,7 @@ public class SingleThreadFinalSortFilesMerger {
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
-  public Object[] next() throws CarbonDataWriterException {
+  public Object[] next() {
     return getSortedRecordFromFile();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
index f2f119e..f21b1ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
@@ -21,7 +21,7 @@ package org.apache.carbondata.processing.store.writer.exception;
 
 import java.util.Locale;
 
-public class CarbonDataWriterException extends Exception {
+public class CarbonDataWriterException extends RuntimeException {
 
   /**
    * default serial version ID.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 680d730..9b7389a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.processing.util;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -38,8 +39,10 @@ import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.pentaho.di.core.CheckResult;
 import org.pentaho.di.core.CheckResultInterface;
 import org.pentaho.di.core.Const;
@@ -280,4 +283,25 @@ public final class CarbonDataProcessorUtil {
     String localDataLoadFolderLocation = carbonDataDirectoryPath + File.separator + taskId;
     return localDataLoadFolderLocation;
   }
+
+  /**
+   * Preparing the boolean [] to map whether the dimension is no Dictionary or not.
+   */
+  public static boolean[] getNoDictionaryMapping(DataField[] fields) {
+    List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
+    for (DataField field : fields) {
+      // for  complex type need to break the loop
+      if (field.getColumn().isComplex()) {
+        break;
+      }
+
+      if (!field.hasDictionaryEncoding() && field.getColumn().isDimesion()) {
+        noDictionaryMapping.add(true);
+      } else if (field.getColumn().isDimesion()) {
+        noDictionaryMapping.add(false);
+      }
+    }
+    return ArrayUtils
+        .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
index 76df1ff..ad31e4a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
@@ -123,6 +123,61 @@ public class RemoveDictionaryUtil {
   }
 
   /**
+   * This method will form one single byte [] for all the high card dims.
+   * For example if you need to pack 2 columns c1 and c2 , it stores in following way
+   *  <total_len(short)><offsetLen(short)><offsetLen+c1_len(short)><c1(byte[])><c2(byte[])>
+   * @param byteBufferArr
+   * @return
+   */
+  public static byte[] packByteBufferIntoSingleByteArray(byte[][] byteBufferArr) {
+    // for empty array means there is no data to remove dictionary.
+    if (null == byteBufferArr || byteBufferArr.length == 0) {
+      return null;
+    }
+    int noOfCol = byteBufferArr.length;
+    short toDetermineLengthOfByteArr = 2;
+    short offsetLen = (short) (noOfCol * 2 + toDetermineLengthOfByteArr);
+    int totalBytes = calculateTotalBytes(byteBufferArr) + offsetLen;
+
+    ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
+
+    // write the length of the byte [] as first short
+    buffer.putShort((short) (totalBytes - toDetermineLengthOfByteArr));
+    // writing the offset of the first element.
+    buffer.putShort(offsetLen);
+
+    // prepare index for byte []
+    for (int index = 0; index < byteBufferArr.length - 1; index++) {
+      int noOfBytes = byteBufferArr[index].length;
+
+      buffer.putShort((short) (offsetLen + noOfBytes));
+      offsetLen += noOfBytes;
+    }
+
+    // put actual data.
+    for (int index = 0; index < byteBufferArr.length; index++) {
+      buffer.put(byteBufferArr[index]);
+    }
+    buffer.rewind();
+    return buffer.array();
+
+  }
+
+  /**
+   * To calculate the total bytes in byte Buffer[].
+   *
+   * @param byteBufferArr
+   * @return
+   */
+  private static int calculateTotalBytes(byte[][] byteBufferArr) {
+    int total = 0;
+    for (int index = 0; index < byteBufferArr.length; index++) {
+      total += byteBufferArr[index].length;
+    }
+    return total;
+  }
+
+  /**
    * Method to check whether entire row is empty or not.
    *
    * @param row


Mime
View raw message