carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/3] incubator-carbondata git commit: Added batch sort to improve the loading performance
Date Mon, 20 Mar 2017 01:26:23 GMT
Added batch sort to improve the loading performance

Fixed comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b13ead9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b13ead9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b13ead9c

Branch: refs/heads/master
Commit: b13ead9c8ef419db7f2807b3828653ba9b0d853a
Parents: 0b44d0e
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Mar 1 21:57:32 2017 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Mon Mar 20 09:25:44 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  19 +
 .../core/datastore/block/TableBlockInfo.java    |   9 +-
 .../carbondata/core/memory/CarbonUnsafe.java    |   4 +
 .../core/mutate/CarbonUpdateUtil.java           |   3 +-
 .../apache/carbondata/core/util/ByteUtil.java   | 136 +++---
 .../core/util/path/CarbonTablePath.java         |  17 +-
 .../datastore/block/TableBlockInfoTest.java     |   2 +-
 .../CarbonFormatDirectoryStructureTest.java     |   4 +-
 .../spark/merger/TupleConversionAdapter.java    |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../graphgenerator/GraphGenerator.java          |   8 +-
 .../processing/mdkeygen/MDKeyGenStep.java       |  18 +-
 .../newflow/CarbonDataLoadConfiguration.java    |  11 +
 .../newflow/DataLoadProcessBuilder.java         |  24 +
 .../constants/DataLoadProcessorConstants.java   |   8 -
 .../converter/DictionaryCardinalityFinder.java  |  26 +
 .../newflow/converter/RowConverter.java         |   2 +-
 .../converter/impl/RowConverterImpl.java        |  32 +-
 .../processing/newflow/row/CarbonRowBatch.java  |   4 +-
 .../processing/newflow/row/CarbonSortBatch.java |  44 ++
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 270 +++++++++++
 .../sort/unsafe/UnsafeCarbonRowPage.java        |   3 +-
 .../sort/unsafe/UnsafeMemoryManager.java        |   4 +
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  30 +-
 .../unsafe/comparator/UnsafeRowComparator.java  |  34 +-
 .../merger/UnsafeIntermediateFileMerger.java    |   3 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  10 +-
 .../steps/DataConverterProcessorStepImpl.java   |   1 +
 ...ConverterProcessorWithBucketingStepImpl.java |   1 +
 .../steps/DataWriterBatchProcessorStepImpl.java | 191 ++++++++
 .../steps/DataWriterProcessorStepImpl.java      |  28 +-
 .../newflow/steps/SortProcessorStepImpl.java    |   9 +-
 .../sortdata/IntermediateFileMerger.java        |  26 +-
 .../sortandgroupby/sortdata/RowComparator.java  |  10 +-
 .../sortdata/RowComparatorForNormalDims.java    |   6 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  14 +-
 .../sortdata/SortTempFileChunkHolder.java       |  14 +-
 .../UnCompressedTempSortFileWriter.java         |   8 +-
 .../sortdatastep/SortKeyStep.java               |   8 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  13 +-
 .../store/CarbonFactDataHandlerModel.java       |  13 +-
 .../store/writer/AbstractFactDataWriter.java    |   5 +-
 .../store/writer/CarbonDataWriterVo.java        |  10 +
 .../csvbased/CarbonCSVBasedSeqGenMeta.java      |   6 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |   8 +-
 .../processing/util/NonDictionaryUtil.java      | 479 +++++++++++++++++++
 .../processing/util/RemoveDictionaryUtil.java   | 479 -------------------
 48 files changed, 1359 insertions(+), 703 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6e2be5d..14fb493 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1095,6 +1095,25 @@ public final class CarbonCommonConstants {
 
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
 
+  /**
+   * Sorts the data in batches and writes the batch data to store with index file.
+   */
+  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
+
+  /**
+   * If set to true, the sorting scope is smaller and more index tree will be created,
+   * thus loading is faster but query maybe slower.
+   * If set to false, the sorting scope is bigger and one index tree per data node will be created,
+   * thus loading is slower but query is faster.
+   */
+  public static final String LOAD_USE_BATCH_SORT_DEFAULT = "false";
+
+  /**
+   * Size of batch data to keep in memory, as a thumb rule it supposed
+   * to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk
+   */
+  public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";
+
   public static final String ENABLE_VECTOR_READER = "carbon.enable.vector.reader";
 
   public static final String ENABLE_VECTOR_READER_DEFAULT = "false";

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 659a028..8fbaa4a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
 
@@ -212,10 +213,10 @@ public class TableBlockInfo implements Distributable, Serializable {
     // offset of
     // the file
     if (CarbonTablePath.isCarbonDataFile(filePath)) {
-      int firstTaskId = Integer.parseInt(DataFileUtil.getTaskNo(filePath));
-      int otherTaskId = Integer.parseInt(DataFileUtil.getTaskNo(((TableBlockInfo) other).filePath));
-      if (firstTaskId != otherTaskId) {
-        return firstTaskId - otherTaskId;
+      int compare = ByteUtil.compare(DataFileUtil.getTaskNo(filePath).getBytes(),
+          DataFileUtil.getTaskNo(((TableBlockInfo) other).filePath).getBytes());
+      if (compare != 0) {
+        return compare;
       }
       // compare the part no of both block info
       int firstPartNo = Integer.parseInt(DataFileUtil.getPartNo(filePath));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java b/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
index 5713955..73a6d84 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.memory;
 
 import java.lang.reflect.Field;
+import java.nio.ByteOrder;
 
 import sun.misc.Unsafe;
 
@@ -34,6 +35,9 @@ public final class CarbonUnsafe {
 
   public static final int DOUBLE_ARRAY_OFFSET;
 
+  public static final boolean ISLITTLEENDIAN =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
   public static Unsafe unsafe;
 
   static {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 00dfbc7..fef5905 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -407,7 +407,8 @@ public class CarbonUpdateUtil {
     int max = 0;
     if (null != dataFiles) {
       for (CarbonFile file : dataFiles) {
-        int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
+        int taskNumber =
+            Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
         if (taskNumber > max) {
           max = taskNumber;
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 0cd2282..9b2c2ed 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -17,13 +17,10 @@
 
 package org.apache.carbondata.core.util;
 
-import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
 
 /**
  * Util class for byte comparision
@@ -106,43 +103,6 @@ public final class ByteUtil {
     INSTANCE;
 
     /**
-     * unsafe .
-     */
-    static final sun.misc.Unsafe THEUNSAFE;
-
-    /**
-     * The offset to the first element in a byte array.
-     */
-    static final int BYTE_ARRAY_BASE_OFFSET;
-    static final boolean LITTLEENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
-    static {
-      THEUNSAFE = (sun.misc.Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
-        @Override public Object run() {
-          try {
-            Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
-            f.setAccessible(true);
-            return f.get(null);
-          } catch (NoSuchFieldException e) {
-            // It doesn't matter what we throw;
-            // it's swallowed in getBestComparer().
-            throw new Error();
-          } catch (IllegalAccessException e) {
-            throw new Error();
-          }
-        }
-      });
-
-      BYTE_ARRAY_BASE_OFFSET = THEUNSAFE.arrayBaseOffset(byte[].class);
-
-      // sanity check - this should never fail
-      if (THEUNSAFE.arrayIndexScale(byte[].class) != 1) {
-        throw new AssertionError();
-      }
-
-    }
-
-    /**
      * Returns true if x1 is less than x2, when both values are treated as
      * unsigned.
      */
@@ -169,8 +129,8 @@ public final class ByteUtil {
       }
       int minLength = Math.min(length1, length2);
       int minWords = minLength / SIZEOF_LONG;
-      int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
-      int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
+      int offset1Adj = offset1 + CarbonUnsafe.BYTE_ARRAY_OFFSET;
+      int offset2Adj = offset2 + CarbonUnsafe.BYTE_ARRAY_OFFSET;
 
       /*
        * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
@@ -178,12 +138,12 @@ public final class ByteUtil {
        * 32-bit. On the other hand, it is substantially faster on 64-bit.
        */
       for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
-        long lw = THEUNSAFE.getLong(buffer1, offset1Adj + (long) i);
-        long rw = THEUNSAFE.getLong(buffer2, offset2Adj + (long) i);
+        long lw = CarbonUnsafe.unsafe.getLong(buffer1, offset1Adj + (long) i);
+        long rw = CarbonUnsafe.unsafe.getLong(buffer2, offset2Adj + (long) i);
         long diff = lw ^ rw;
 
         if (diff != 0) {
-          if (!LITTLEENDIAN) {
+          if (!CarbonUnsafe.ISLITTLEENDIAN) {
             return lessThanUnsigned(lw, rw) ? -1 : 1;
           }
 
@@ -231,8 +191,11 @@ public final class ByteUtil {
       int len1 = buffer1.length;
       int len2 = buffer2.length;
       int minLength = (len1 <= len2) ? len1 : len2;
-      int minWords = 0;
+      return compareTo(buffer1, buffer2, len1, len2, minLength);
+    }
 
+    public int compareTo(byte[] buffer1, byte[] buffer2, int len1, int len2, int minLength) {
+      int minWords = 0;
       /*
        * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
        * at a time is no slower than comparing 4 bytes at a time even on
@@ -241,12 +204,12 @@ public final class ByteUtil {
       if (minLength > 7) {
         minWords = minLength / SIZEOF_LONG;
         for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
-          long lw = THEUNSAFE.getLong(buffer1, BYTE_ARRAY_BASE_OFFSET + (long) i);
-          long rw = THEUNSAFE.getLong(buffer2, BYTE_ARRAY_BASE_OFFSET + (long) i);
+          long lw = CarbonUnsafe.unsafe.getLong(buffer1, CarbonUnsafe.BYTE_ARRAY_OFFSET + (long) i);
+          long rw = CarbonUnsafe.unsafe.getLong(buffer2, CarbonUnsafe.BYTE_ARRAY_OFFSET + (long) i);
           long diff = lw ^ rw;
 
           if (diff != 0) {
-            if (!LITTLEENDIAN) {
+            if (!CarbonUnsafe.ISLITTLEENDIAN) {
               return lessThanUnsigned(lw, rw) ? -1 : 1;
             }
 
@@ -285,15 +248,78 @@ public final class ByteUtil {
       return len1 - len2;
     }
 
+    public int compareUnsafeTo(Object baseObject1, Object baseObject2, long address1, long address2,
+        int len1, int len2, int minLength) {
+
+      int minWords = 0;
+
+      /*
+       * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes
+       * at a time is no slower than comparing 4 bytes at a time even on
+       * 32-bit. On the other hand, it is substantially faster on 64-bit.
+       */
+      if (minLength > 7) {
+        minWords = minLength / SIZEOF_LONG;
+        for (int i = 0; i < minWords * SIZEOF_LONG; i += SIZEOF_LONG) {
+          long lw = CarbonUnsafe.unsafe
+              .getLong(baseObject1, CarbonUnsafe.BYTE_ARRAY_OFFSET + (long) i + address1);
+          long rw = CarbonUnsafe.unsafe
+              .getLong(baseObject2, CarbonUnsafe.BYTE_ARRAY_OFFSET + (long) i + address2);
+          long diff = lw ^ rw;
+
+          if (diff != 0) {
+            if (!CarbonUnsafe.ISLITTLEENDIAN) {
+              return lessThanUnsigned(lw, rw) ? -1 : 1;
+            }
+
+            // Use binary search
+            int k = 0;
+            int y;
+            int x = (int) diff;
+            if (x == 0) {
+              x = (int) (diff >>> 32);
+              k = 32;
+            }
+            y = x << 16;
+            if (y == 0) {
+              k += 16;
+            } else {
+              x = y;
+            }
+
+            y = x << 8;
+            if (y == 0) {
+              k += 8;
+            }
+            return (int) (((lw >>> k) & 0xFFL) - ((rw >>> k) & 0xFFL));
+          }
+        }
+      }
+
+      // The epilogue to cover the last (minLength % 8) elements.
+      for (int i = minWords * SIZEOF_LONG; i < minLength; i++) {
+        int a =
+            (CarbonUnsafe.unsafe.getByte(baseObject1, CarbonUnsafe.BYTE_ARRAY_OFFSET + i + address1)
+                & 0xff);
+        int b =
+            (CarbonUnsafe.unsafe.getByte(baseObject2, CarbonUnsafe.BYTE_ARRAY_OFFSET + i + address2)
+                & 0xff);
+        if (a != b) {
+          return a - b;
+        }
+      }
+      return len1 - len2;
+    }
+
     public boolean equals(byte[] buffer1, byte[] buffer2) {
       if (buffer1.length != buffer2.length) {
         return false;
       }
       int len = buffer1.length / 8;
-      long currentOffset = BYTE_ARRAY_BASE_OFFSET;
+      long currentOffset = CarbonUnsafe.BYTE_ARRAY_OFFSET;
       for (int i = 0; i < len; i++) {
-        long lw = THEUNSAFE.getLong(buffer1, currentOffset);
-        long rw = THEUNSAFE.getLong(buffer2, currentOffset);
+        long lw = CarbonUnsafe.unsafe.getLong(buffer1, currentOffset);
+        long rw = CarbonUnsafe.unsafe.getLong(buffer2, currentOffset);
         if (lw != rw) {
           return false;
         }
@@ -302,8 +328,8 @@ public final class ByteUtil {
       len = buffer1.length % 8;
       if (len > 0) {
         for (int i = 0; i < len; i += 1) {
-          long lw = THEUNSAFE.getByte(buffer1, currentOffset);
-          long rw = THEUNSAFE.getByte(buffer2, currentOffset);
+          long lw = CarbonUnsafe.unsafe.getByte(buffer1, currentOffset);
+          long rw = CarbonUnsafe.unsafe.getByte(buffer2, currentOffset);
           if (lw != rw) {
             return false;
           }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 854db71..7e37689 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -46,6 +46,7 @@ public class CarbonTablePath extends Path {
   protected static final String CARBON_DELTE_DELTA_EXT = ".deletedelta";
   protected static final String CARBON_UPDATE_DELTA_EXT = ".updatedelta";
   protected static final String DATA_PART_PREFIX = "part-";
+  protected static final String BATCH_PREFIX = "_batchno";
   protected static final String INDEX_FILE_EXT = ".carbonindex";
   protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
 
@@ -236,9 +237,9 @@ public class CarbonTablePath extends Path {
    * @return absolute path of data file stored in carbon data format
    */
   public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
-      Integer taskNo, int bucketNumber, String factUpdateTimeStamp) {
+      Integer taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
     return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
-        filePartNo, taskNo, bucketNumber, factUpdateTimeStamp);
+        filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
   }
 
   /**
@@ -352,9 +353,9 @@ public class CarbonTablePath extends Path {
    * @return gets data file name only with out path
    */
   public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
-      String factUpdateTimeStamp) {
-    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
-        + factUpdateTimeStamp + CARBON_DATA_EXT;
+      int batchNo, String factUpdateTimeStamp) {
+    return DATA_PART_PREFIX + filePartNo + "-" + taskNo + BATCH_PREFIX + batchNo + "-"
+        + bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
   /**
@@ -364,8 +365,10 @@ public class CarbonTablePath extends Path {
    * @param factUpdatedTimeStamp time stamp
    * @return filename
    */
-  public String getCarbonIndexFileName(int taskNo, int bucketNumber, String factUpdatedTimeStamp) {
-    return taskNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
+  public String getCarbonIndexFileName(int taskNo, int bucketNumber, int batchNo,
+      String factUpdatedTimeStamp) {
+    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp
+        + INDEX_FILE_EXT;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
index 2402307..840287e 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
@@ -131,7 +131,7 @@ public class TableBlockInfoTest {
 
     TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
-    int expectedResult = -5;
+    int expectedResult = 7;
     assertEquals(res, expectedResult);
 
     TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 2c5b495..395a68c 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -50,8 +50,8 @@ public class CarbonFormatDirectoryStructureTest {
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
     assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, 0, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-0-999.carbondata"));
+    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4,  0, 0, "999").replace("\\", "/")
+        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
index cedb8ac..c65dab6 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 /**
  * This class will be used to convert the Result into the format used in data writer.
@@ -67,7 +67,7 @@ class TupleConversionAdapter {
       for (int i = 0; i < noDicCount; i++) {
         noDicByteArr.add(((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeyByIndex(i));
       }
-      byte[] singleByteArr = RemoveDictionaryUtil.convertListByteArrToSingleArr(noDicByteArr);
+      byte[] singleByteArr = NonDictionaryUtil.convertListByteArrToSingleArr(noDicByteArr);
 
       row[index++] = singleByteArr;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 306d277..dbd2a21 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -704,7 +704,7 @@ object CarbonDataRDDFactory {
               val index = taskNo + 1
               uniqueLoadStatusId = carbonLoadModel.getTableName +
                                    CarbonCommonConstants.UNDERSCORE +
-                                   index
+                                   (index + "_0")
 
               // convert timestamp
               val timeStampInLong = updateModel.get.updatedTimeStamp + ""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index eb9e88b..2fc918d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -712,7 +712,7 @@ object CarbonDataRDDFactory {
               val index = taskNo + 1
               uniqueLoadStatusId = carbonLoadModel.getTableName +
                                    CarbonCommonConstants.UNDERSCORE +
-                                   index
+                                   (index + "_0")
 
               // convert timestamp
               val timeStampInLong = updateModel.get.updatedTimeStamp + ""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
index 9a25413..2876eb8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -54,7 +54,7 @@ import org.apache.carbondata.processing.sortandgroupby.sortdatastep.SortKeyStepM
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedSeqGenMeta;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.CarbonSchemaParser;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 import org.pentaho.di.core.KettleEnvironment;
 import org.pentaho.di.core.database.DatabaseMeta;
@@ -605,7 +605,7 @@ public class GraphGenerator {
   private StepMeta getMDKeyStep(GraphConfigurationInfo graphConfiguration) {
     MDKeyGenStepMeta carbonMdKey = new MDKeyGenStepMeta();
     carbonMdKey.setIsUseInvertedIndex(
-        RemoveDictionaryUtil.convertBooleanArrToString(graphConfiguration.getIsUseInvertedIndex()));
+        NonDictionaryUtil.convertBooleanArrToString(graphConfiguration.getIsUseInvertedIndex()));
     carbonMdKey.setPartitionID(partitionID);
     carbonMdKey.setSegmentId(segmentId);
     carbonMdKey.setNumberOfCores(graphConfiguration.getNumberOfCores());
@@ -616,7 +616,7 @@ public class GraphGenerator {
     carbonMdKey.setAggregateLevels(CarbonDataProcessorUtil
         .getLevelCardinalitiesString(graphConfiguration.getDimCardinalities(),
             graphConfiguration.getDimensions()));
-    carbonMdKey.setNoDictionaryDimsMapping(RemoveDictionaryUtil
+    carbonMdKey.setNoDictionaryDimsMapping(NonDictionaryUtil
         .convertBooleanArrToString(graphConfiguration.getIsNoDictionaryDimMapping()));
     carbonMdKey.setMeasureCount(graphConfiguration.getMeasureCount() + "");
     carbonMdKey.setColumnGroupsString(graphConfiguration.getColumnGroupsString());
@@ -766,7 +766,7 @@ public class GraphGenerator {
     sortRowsMeta.setMeasureCount(graphConfiguration.getMeasureCount() + "");
     sortRowsMeta.setNoDictionaryDims(graphConfiguration.getNoDictionaryDims());
     sortRowsMeta.setMeasureDataType(graphConfiguration.getMeasureDataTypeInfo());
-    sortRowsMeta.setNoDictionaryDimsMapping(RemoveDictionaryUtil
+    sortRowsMeta.setNoDictionaryDimsMapping(NonDictionaryUtil
         .convertBooleanArrToString(graphConfiguration.getIsNoDictionaryDimMapping()));
 
     StepMeta sortRowsStep = new StepMeta(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
index f916c04..2f92699 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@ -54,7 +54,7 @@ import org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 import org.pentaho.di.core.exception.KettleException;
 import org.pentaho.di.core.row.RowMeta;
@@ -267,9 +267,9 @@ public class MDKeyGenStep extends BaseStep {
             String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId() + "",
             false);
     isNoDictionaryDimension =
-        RemoveDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
+        NonDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
     isUseInvertedIndex =
-        RemoveDictionaryUtil.convertStringToBooleanArr(meta.getIsUseInvertedIndex());
+        NonDictionaryUtil.convertStringToBooleanArr(meta.getIsUseInvertedIndex());
     fileManager = new FileManager();
     fileManager.setName(CarbonCommonConstants.LOAD_FOLDER + meta.getSegmentId()
         + CarbonCommonConstants.FILE_INPROGRESS_STATUS);
@@ -280,7 +280,7 @@ public class MDKeyGenStep extends BaseStep {
     }
 
     this.meta.setNoDictionaryCount(
-        RemoveDictionaryUtil.extractNoDictionaryCount(this.meta.getNoDictionaryDims()));
+        NonDictionaryUtil.extractNoDictionaryCount(this.meta.getNoDictionaryDims()));
 
     String levelCardinalityFilePath = storeLocation + File.separator +
         CarbonCommonConstants.LEVEL_METADATA_FILE + meta.getTableName()
@@ -460,18 +460,18 @@ public class MDKeyGenStep extends BaseStep {
     int index = 0;
     for (int i = 0; i < measureCount; i++) {
       if (aggType[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        outputRow[l++] = RemoveDictionaryUtil.getMeasure(index++, row);
+        outputRow[l++] = NonDictionaryUtil.getMeasure(index++, row);
       } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        outputRow[l++] = (Long) RemoveDictionaryUtil.getMeasure(index++, row);
+        outputRow[l++] = (Long) NonDictionaryUtil.getMeasure(index++, row);
       } else {
-        outputRow[l++] = (Double) RemoveDictionaryUtil.getMeasure(index++, row);
+        outputRow[l++] = (Double) NonDictionaryUtil.getMeasure(index++, row);
       }
     }
-    outputRow[l] = RemoveDictionaryUtil.getByteArrayForNoDictionaryCols(row);
+    outputRow[l] = NonDictionaryUtil.getByteArrayForNoDictionaryCols(row);
 
     int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
     for (int i = 0; i < highCardExcludedRows.length; i++) {
-      Object key = RemoveDictionaryUtil.getDimension(i, row);
+      Object key = NonDictionaryUtil.getDimension(i, row);
       highCardExcludedRows[i] = (Integer) key;
     }
     try {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 926f4f6..0bd3e45 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.processing.newflow.converter.DictionaryCardinalityFinder;
 
 public class CarbonDataLoadConfiguration {
 
@@ -73,6 +74,8 @@ public class CarbonDataLoadConfiguration {
    */
   private long schemaUpdatedTimeStamp;
 
+  private DictionaryCardinalityFinder cardinalityFinder;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -245,4 +248,12 @@ public class CarbonDataLoadConfiguration {
   public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
     this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
   }
+
+  public DictionaryCardinalityFinder getCardinalityFinder() {
+    return cardinalityFinder;
+  }
+
+  public void setCardinalityFinder(DictionaryCardinalityFinder cardinalityFinder) {
+    this.cardinalityFinder = cardinalityFinder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index f17779c..4ebb2fb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
+import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
@@ -52,10 +53,15 @@ public final class DataLoadProcessBuilder {
 
   public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
       CarbonIterator[] inputIterators) throws Exception {
+    boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
+            CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
     CarbonDataLoadConfiguration configuration =
         createConfiguration(loadModel, storeLocation);
     if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
+    } else if (batchSort) {
+      return buildInternalForBatchSort(inputIterators, configuration);
     } else {
       return buildInternal(inputIterators, configuration);
     }
@@ -79,6 +85,24 @@ public final class DataLoadProcessBuilder {
     return writerProcessorStep;
   }
 
+  private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data which are part of key (all dimensions except complex types)
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    AbstractDataLoadProcessorStep writerProcessorStep =
+        new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
+    return writerProcessorStep;
+  }
+
   private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
       CarbonDataLoadConfiguration configuration) throws Exception {
     // 1. Reads the data input iterators and parses the data.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
index ea06cd0..11570b4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
@@ -22,18 +22,10 @@ package org.apache.carbondata.processing.newflow.constants;
  */
 public final class DataLoadProcessorConstants {
 
-  public static final String TEMP_STORE_LOCATION = "TEMP_STORE_LOCATION";
-
-  public static final String BLOCKLET_SIZE = "BLOCKLET_SIZE";
-
-  public static final String SORT_SIZE = "SORT_SIZE";
-
   public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP";
 
   public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
 
-  public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
-
   public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
 
   public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
new file mode 100644
index 0000000..751f909
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.converter;
+
+/**
+ * Finds the current cardinality of dimensions.
+ */
+public interface DictionaryCardinalityFinder {
+
+  int[] getCardinality();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
index ce1d35a..bde3cc7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
 /**
  * convert the row
  */
-public interface RowConverter {
+public interface RowConverter extends DictionaryCardinalityFinder {
 
   void initialize() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index aa05798..3ba7bdf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -171,22 +171,6 @@ public class RowConverterImpl implements RowConverter {
 
   @Override
   public void finish() {
-    List<Integer> dimCardinality = new ArrayList<>();
-    if (fieldConverters != null) {
-      for (int i = 0; i < fieldConverters.length; i++) {
-        if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
-          ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
-              .fillColumnCardinality(dimCardinality);
-        }
-      }
-    }
-    int[] cardinality = new int[dimCardinality.size()];
-    for (int i = 0; i < dimCardinality.size(); i++) {
-      cardinality[i] = dimCardinality.get(i);
-    }
-    // Set the cardinality to configuration, it will be used by further step for mdk key.
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, cardinality);
-
     // close dictionary client when finish write
     if (configuration.getUseOnePass()) {
       for (DictionaryClient client : dictClients) {
@@ -229,4 +213,20 @@ public class RowConverterImpl implements RowConverter {
     return converter;
   }
 
+  @Override public int[] getCardinality() {
+    List<Integer> dimCardinality = new ArrayList<>();
+    if (fieldConverters != null) {
+      for (int i = 0; i < fieldConverters.length; i++) {
+        if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
+          ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+              .fillColumnCardinality(dimCardinality);
+        }
+      }
+    }
+    int[] cardinality = new int[dimCardinality.size()];
+    for (int i = 0; i < dimCardinality.size(); i++) {
+      cardinality[i] = dimCardinality.get(i);
+    }
+    return cardinality;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
index 3c6a7d3..c353a53 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
@@ -17,12 +17,12 @@
 
 package org.apache.carbondata.processing.newflow.row;
 
-import java.util.Iterator;
+import org.apache.carbondata.common.CarbonIterator;
 
 /**
  * Batch of rows.
  */
-public class CarbonRowBatch implements Iterator<CarbonRow> {
+public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
 
   private CarbonRow[] rowBatch;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
new file mode 100644
index 0000000..e0688f5
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.row;
+
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+
+/**
+ * Batch of sorted rows which are ready to be processed by
+ */
+public class CarbonSortBatch extends CarbonRowBatch {
+
+  private UnsafeSingleThreadFinalSortFilesMerger iterator;
+
+  public CarbonSortBatch(UnsafeSingleThreadFinalSortFilesMerger iterator) {
+    super(0);
+    this.iterator = iterator;
+  }
+
+  @Override public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override public CarbonRow next() {
+    return new CarbonRow(iterator.next());
+  }
+
+  @Override public void close() {
+    iterator.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..f3a60fc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * It sorts data in batches and send to the next step.
+ */
+public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private ExecutorService executorService;
+
+  private AtomicLong rowCounter;
+
+  public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length);
+
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService
+            .submit(new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter));
+      }
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+
+    // Creates the iterator to read from merge sorter.
+    Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
+
+      @Override public boolean hasNext() {
+        return sortBatchHolder.hasNext();
+      }
+
+      @Override public CarbonSortBatch next() {
+        return new CarbonSortBatch(sortBatchHolder.next());
+      }
+    };
+    return new Iterator[] { batchIterator };
+  }
+
+  @Override public void close() {
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows
+   */
+  private static class SortIteratorThread implements Callable<Void> {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortBatchHolder sortDataRows;
+
+    private Object[][] buffer;
+
+    private AtomicLong rowCounter;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
+        int batchSize, AtomicLong rowCounter) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.buffer = new Object[batchSize][];
+      this.rowCounter = rowCounter;
+    }
+
+    @Override public Void call() throws CarbonDataLoadingException {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          int i = 0;
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              buffer[i++] = row.getData();
+            }
+          }
+          if (i > 0) {
+            sortDataRows.getSortDataRow().addRowBatch(buffer, i);
+            rowCounter.getAndAdd(i);
+            synchronized (sortDataRows) {
+              if (!sortDataRows.getSortDataRow().canAdd()) {
+                sortDataRows.finish();
+                sortDataRows.createSortDataRows();
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(e);
+      } finally {
+        sortDataRows.finishThread();
+      }
+      return null;
+    }
+
+  }
+
+  private static class SortBatchHolder
+      extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
+
+    private SortParameters sortParameters;
+
+    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+    private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
+
+    private UnsafeSortDataRows sortDataRow;
+
+    private final BlockingQueue<UnsafeSingleThreadFinalSortFilesMerger> mergerQueue;
+
+    private AtomicInteger iteratorCount;
+
+    public SortBatchHolder(SortParameters sortParameters, int numberOfThreads) {
+      this.sortParameters = sortParameters;
+      this.iteratorCount = new AtomicInteger(numberOfThreads);
+      this.mergerQueue = new LinkedBlockingQueue<>();
+      createSortDataRows();
+    }
+
+    private void createSortDataRows() {
+      this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters);
+      unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
+      sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger);
+
+      try {
+        sortDataRow.initialize();
+      } catch (CarbonSortKeyAndGroupByException e) {
+        throw new CarbonDataLoadingException(e);
+      }
+    }
+
+    @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
+      try {
+        return mergerQueue.take();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public UnsafeSortDataRows getSortDataRow() {
+      return sortDataRow;
+    }
+
+    public void finish() {
+      try {
+        processRowToNextStep(sortDataRow, sortParameters);
+        unsafeIntermediateFileMerger.finish();
+        List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
+        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+            unsafeIntermediateFileMerger.getMergedPages());
+        unsafeIntermediateFileMerger.close();
+        mergerQueue.offer(finalMerger);
+        sortDataRow = null;
+        unsafeIntermediateFileMerger = null;
+        finalMerger = null;
+      } catch (CarbonDataWriterException e) {
+        throw new CarbonDataLoadingException(e);
+      } catch (CarbonSortKeyAndGroupByException e) {
+        throw new CarbonDataLoadingException(e);
+      }
+    }
+
+    public synchronized void finishThread() {
+      if (iteratorCount.decrementAndGet() <= 0) {
+        finish();
+      }
+    }
+
+    public synchronized boolean hasNext() {
+      return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
+    }
+
+    /**
+     * Below method will be used to process data to next step
+     */
+    private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
+        throws CarbonDataLoadingException {
+      if (null == sortDataRows) {
+        LOGGER.info("Record Processed For table: " + parameters.getTableName());
+        LOGGER.info("Number of Records was Zero");
+        String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+        LOGGER.info(logMessage);
+        return false;
+      }
+
+      try {
+        // start sorting
+        sortDataRows.startSorting();
+
+        // check any more rows are present
+        LOGGER.info("Record Processed For table: " + parameters.getTableName());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+            .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+            .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+                System.currentTimeMillis());
+        return false;
+      } catch (InterruptedException e) {
+        throw new CarbonDataLoadingException(e);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 60af5d8..e468028 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -66,10 +66,11 @@ public class UnsafeCarbonRowPage {
     sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
   }
 
-  public void addRow(Object[] row) {
+  public int addRow(Object[] row) {
     int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
     buffer.set(lastSize);
     lastSize = lastSize + size;
+    return size;
   }
 
   private int addRow(Object[] row, long address) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
index 7e7bb7e..6a3e165 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
@@ -97,4 +97,8 @@ public class UnsafeMemoryManager {
   public boolean isMemoryAvailable() {
     return getAvailableMemory() > minimumMemory;
   }
+
+  public long getUsableMemory() {
+    return totalMemory - minimumMemory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 4baabc3..9907509 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -73,6 +73,10 @@ public class UnsafeSortDataRows {
 
   private boolean enableInMemoryIntermediateMerge;
 
+  private int bytesAdded;
+
+  private long maxSizeAllowed;
+
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger) {
     this.parameters = parameters;
@@ -88,6 +92,14 @@ public class UnsafeSortDataRows {
     enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
             CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
+
+    this.maxSizeAllowed = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+    if (maxSizeAllowed <= 0) {
+      this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
+    } else {
+      this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
+    }
   }
 
   /**
@@ -96,8 +108,9 @@ public class UnsafeSortDataRows {
   public void initialize() throws CarbonSortKeyAndGroupByException {
     MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024);
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
-        parameters.getDimColCount(), parameters.getMeasureColCount(), parameters.getAggType(),
-        baseBlock, !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
+        parameters.getDimColCount() + parameters.getComplexDimColCount(),
+        parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
+        !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -131,6 +144,10 @@ public class UnsafeSortDataRows {
     return baseBlock;
   }
 
+  public boolean canAdd() {
+    return bytesAdded < maxSizeAllowed;
+  }
+
   /**
    * This method will be used to add new row
    *
@@ -143,7 +160,7 @@ public class UnsafeSortDataRows {
     synchronized (addRowsLock) {
       for (int i = 0; i < size; i++) {
         if (rowPage.canAdd()) {
-          rowPage.addRow(rowBatch[i]);
+          bytesAdded += rowPage.addRow(rowBatch[i]);
         } else {
           try {
             if (enableInMemoryIntermediateMerge) {
@@ -154,10 +171,9 @@ public class UnsafeSortDataRows {
             MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024);
             boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
             rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
-                parameters.getDimColCount(), parameters.getMeasureColCount(),
-                parameters.getAggType(), memoryBlock,
-                saveToDisk);
-            rowPage.addRow(rowBatch[i]);
+                parameters.getDimColCount() + parameters.getComplexDimColCount(),
+                parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk);
+            bytesAdded += rowPage.addRow(rowBatch[i]);
           } catch (Exception e) {
             LOGGER.error(
                 "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
index 8f048bd..e61a284 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -50,23 +50,20 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     for (boolean isNoDictionary : noDictionaryColMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObject, rowA + sizeA);
-        byte[] byteArr1 = new byte[aShort1];
         sizeA += 2;
-        CarbonUnsafe.unsafe.copyMemory(baseObject, rowA + sizeA, byteArr1,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
-        sizeA += aShort1;
 
         short aShort2 = CarbonUnsafe.unsafe.getShort(baseObject, rowB + sizeB);
-        byte[] byteArr2 = new byte[aShort2];
         sizeB += 2;
-        CarbonUnsafe.unsafe.copyMemory(baseObject, rowB + sizeB, byteArr2,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
-        sizeB += aShort2;
+        int minLength = (aShort1 <= aShort2) ? aShort1 : aShort2;
+        int difference = UnsafeComparer.INSTANCE
+            .compareUnsafeTo(baseObject, baseObject, rowA + sizeA, rowB + sizeB, aShort1, aShort2,
+                minLength);
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
         if (difference != 0) {
           return difference;
         }
+        sizeA += aShort1;
+        sizeB += aShort2;
       } else {
         int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA);
         sizeA += 4;
@@ -95,25 +92,22 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     for (boolean isNoDictionary : noDictionaryColMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObjectL, rowA + sizeA);
-        byte[] byteArr1 = new byte[aShort1];
         sizeA += 2;
-        CarbonUnsafe.unsafe
-            .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                aShort1);
-        sizeA += aShort1;
 
         short aShort2 = CarbonUnsafe.unsafe.getShort(baseObjectR, rowB + sizeB);
-        byte[] byteArr2 = new byte[aShort2];
         sizeB += 2;
-        CarbonUnsafe.unsafe
-            .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                aShort2);
-        sizeB += aShort2;
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        int minLength = (aShort1 <= aShort2) ? aShort1 : aShort2;
+
+        int difference = UnsafeComparer.INSTANCE
+            .compareUnsafeTo(baseObjectL, baseObjectR, rowA + sizeA, rowB + sizeB, aShort1, aShort2,
+                minLength);
+
         if (difference != 0) {
           return difference;
         }
+        sizeA += aShort1;
+        sizeB += aShort2;
       } else {
         int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObjectL, rowA + sizeA);
         sizeA += 4;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index ed8023b..7862a95 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -293,7 +293,8 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     }
 
     // write complex dimensions here.
-    int dimensionSize = mergerParameters.getDimColCount();
+    int dimensionSize =
+        mergerParameters.getDimColCount() + mergerParameters.getComplexDimColCount();
     int measureSize = mergerParameters.getMeasureColCount();
     for (; dimCount < dimensionSize; dimCount++) {
       byte[] col = (byte[]) row[dimCount];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index da4ffd0..b98a072 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemor
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   /**
@@ -125,7 +125,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       for (final UnsafeCarbonRowPage rowPage : rowPages) {
 
         SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
-            parameters.getDimColCount() + parameters.getMeasureColCount());
+            parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
+                .getMeasureColCount());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -137,7 +138,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
 
         SortTempChunkHolder sortTempFileChunkHolder =
             new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionaryDimnesionColumn(),
-                parameters.getDimColCount() + parameters.getMeasureColCount());
+                parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
+                    .getMeasureColCount());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -284,7 +286,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
         allCount++;
       }
 
-      RemoveDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
 
       // increment number if record read
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index eae9672..cb6baf4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -64,6 +64,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     badRecordLogger = createBadRecordLogger();
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    configuration.setCardinalityFinder(converter);
     converters.add(converter);
     converter.initialize();
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 8fd2fba..e7b3876 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -71,6 +71,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
     badRecordLogger = createBadRecordLogger();
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    configuration.setCardinalityFinder(converter);
     converters.add(converter);
     converter.initialize();
     List<Integer> indexes = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
new file mode 100644
index 0000000..ae2b625
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.IgnoreDictionary;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from batch of sorted files(it could be in-memory/disk based files)
+ * which are generated in previous sort step. And it writes data to carbondata file.
+ * It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
+
+  private int noDictionaryCount;
+
+  private int complexDimensionCount;
+
+  private int measureCount;
+
+  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
+
+  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
+
+  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
+
+  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override public void initialize() throws IOException {
+    child.initialize();
+  }
+
+  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+    String storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+            configuration.getSegmentId() + "", false);
+    new File(storeLocation).mkdirs();
+    return storeLocation;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] iterators = child.execute();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
+    try {
+      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
+          .createCarbonFactDataHandlerModel(configuration,
+              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
+      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
+      complexDimensionCount = configuration.getComplexDimensionCount();
+      measureCount = dataHandlerModel.getMeasureCount();
+
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+              System.currentTimeMillis());
+      int i = 0;
+      for (Iterator<CarbonRowBatch> iterator : iterators) {
+        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+        int k = 0;
+        while (iterator.hasNext()) {
+          CarbonRowBatch next = iterator.next();
+          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+          CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+          dataHandler.initialise();
+          processBatch(next, dataHandler, model.getSegmentProperties());
+          finish(tableName, dataHandler);
+        }
+        i++;
+      }
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterBatchProcessorStepImpl");
+      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+    }
+    return null;
+  }
+
+  @Override protected String getStepName() {
+    return "Data Batch Writer";
+  }
+
+  private void finish(String tableName, CarbonFactHandler dataHandler) {
+    try {
+      dataHandler.finish();
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
+    processingComplete(dataHandler);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+            System.currentTimeMillis());
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+  }
+
+  private void processingComplete(CarbonFactHandler dataHandler) {
+    if (null != dataHandler) {
+      try {
+        dataHandler.closeHandler();
+      } catch (Exception e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(
+            "There is an unexpected error while closing data handler", e);
+      }
+    }
+  }
+
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
+      SegmentProperties segmentProperties) throws Exception {
+    int batchSize = 0;
+    KeyGenerator keyGenerator = segmentProperties.getDimensionKeyGenerator();
+    while (batch.hasNext()) {
+      CarbonRow row = batch.next();
+      batchSize++;
+      /*
+      * The order of the data is as follows,
+      * Measuredata, nodictionary/complex byte array data, dictionary(MDK generated key)
+      */
+      int len;
+      // adding one for the high cardinality dims byte array.
+      if (noDictionaryCount > 0 || complexDimensionCount > 0) {
+        len = measureCount + 1 + 1;
+      } else {
+        len = measureCount + 1;
+      }
+      Object[] outputRow = new Object[len];;
+
+      int l = 0;
+      Object[] measures = row.getObjectArray(measureIndex);
+      for (int i = 0; i < measureCount; i++) {
+        outputRow[l++] = measures[i];
+      }
+      outputRow[l] = row.getObject(noDimByteArrayIndex);
+      outputRow[len - 1] = keyGenerator.generateKey(row.getIntArray(dimsArrayIndex));
+      dataHandler.addDataToStore(outputRow);
+    }
+    rowCounter.getAndAdd(batchSize);
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b13ead9c/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index dfc03b9..7eacd08 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -96,7 +96,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     try {
       CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
           .createCarbonFactDataHandlerModel(configuration,
-              getStoreLocation(tableIdentifier, String.valueOf(0)), 0);
+              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
       noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
       complexDimensionCount = configuration.getComplexDimensionCount();
       measureCount = dataHandlerModel.getMeasureCount();
@@ -110,7 +110,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
       for (Iterator<CarbonRowBatch> iterator : iterators) {
         String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
         CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-            .createCarbonFactDataHandlerModel(configuration, storeLocation, i);
+            .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
         CarbonFactHandler dataHandler = null;
         boolean rowsNotExist = true;
         while (iterator.hasNext()) {
@@ -183,29 +183,27 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
       while (batch.hasNext()) {
         CarbonRow row = batch.next();
         readCounter++;
-        Object[] outputRow;
+        /*
+        * The order of the data is as follows,
+        * Measuredata, nodictionary/complex byte array data, dictionary(MDK generated key)
+        */
+        int len;
         // adding one for the high cardinality dims byte array.
         if (noDictionaryCount > 0 || complexDimensionCount > 0) {
-          outputRow = new Object[measureCount + 1 + 1];
+          len = measureCount + 1 + 1;
         } else {
-          outputRow = new Object[measureCount + 1];
+          len = measureCount + 1;
         }
+        Object[] outputRow = new Object[len];
+
 
         int l = 0;
-        int index = 0;
         Object[] measures = row.getObjectArray(measureIndex);
         for (int i = 0; i < measureCount; i++) {
-          outputRow[l++] = measures[index++];
+          outputRow[l++] = measures[i];
         }
         outputRow[l] = row.getObject(noDimByteArrayIndex);
-
-        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
-        int[] dimsArray = row.getIntArray(dimsArrayIndex);
-        for (int i = 0; i < highCardExcludedRows.length; i++) {
-          highCardExcludedRows[i] = dimsArray[i];
-        }
-
-        outputRow[outputRow.length - 1] = keyGenerator.generateKey(highCardExcludedRows);
+        outputRow[len - 1] = keyGenerator.generateKey(row.getIntArray(dimsArrayIndex));
         dataHandler.addDataToStore(outputRow);
       }
     } catch (Exception e) {


Mime
View raw message