carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [15/50] [abbrv] carbondata git commit: [CARBONDATA-1839] [DataLoad] Fix bugs and optimize in compressing sort temp files
Date Tue, 09 Jan 2018 04:01:43 GMT
[CARBONDATA-1839] [DataLoad] Fix bugs and optimize in compressing sort temp files

1.fix bugs in compressing sort temp file, use file-level compression instead of batch-record-level compression

2.add tests

3.update docs, add property to configure the compressor

This closes #1707


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c1002511
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c1002511
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c1002511

Branch: refs/heads/branch-1.3
Commit: c1002511af34a3c2ce0ef05f3fb8977d33d43a68
Parents: 1799642
Author: xuchuanyin <xuchuanyin@hust.edu.cn>
Authored: Wed Dec 27 15:20:15 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Jan 3 15:43:57 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  31 ++--
 .../filesystem/AbstractDFSCarbonFile.java       |  92 ++++++++---
 .../core/datastore/filesystem/CarbonFile.java   |  24 +++
 .../datastore/filesystem/LocalCarbonFile.java   | 101 +++++++-----
 .../core/datastore/impl/FileFactory.java        |  26 +++
 .../carbondata/core/util/CarbonProperties.java  |  17 ++
 .../apache/carbondata/core/util/CarbonUtil.java |  15 ++
 core/src/test/resources/sampleCSV.csv           |   1 +
 docs/useful-tips-on-carbondata.md               |   1 +
 .../TestLoadWithSortTempCompressed.scala        | 154 +++++++++++++++++
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   6 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   5 +-
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  17 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   | 151 ++++-------------
 .../merger/UnsafeIntermediateFileMerger.java    |  74 ++++-----
 .../merger/CompactionResultSortProcessor.java   |  13 +-
 .../sortdata/AbstractTempSortFileWriter.java    | 100 -----------
 .../sortdata/CompressedTempSortFileWriter.java  |  78 ---------
 .../sort/sortdata/IntermediateFileMerger.java   | 133 +++++----------
 .../SingleThreadFinalSortFilesMerger.java       |  47 +-----
 .../processing/sort/sortdata/SortDataRows.java  |  60 +------
 .../sort/sortdata/SortParameters.java           |  96 ++---------
 .../sort/sortdata/SortTempFileChunkHolder.java  | 164 ++++---------------
 .../sort/sortdata/SortTempFileChunkWriter.java  |  75 ---------
 .../sort/sortdata/TempSortFileReader.java       |  37 -----
 .../sort/sortdata/TempSortFileWriter.java       |  46 ------
 .../sortdata/TempSortFileWriterFactory.java     |  41 -----
 .../UnCompressedTempSortFileWriter.java         | 112 -------------
 28 files changed, 557 insertions(+), 1160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index fce8373..2d1e4f9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -430,26 +430,6 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE = "3";
   /**
-   * IS_SORT_TEMP_FILE_COMPRESSION_ENABLED
-   */
-  @CarbonProperty
-  public static final String IS_SORT_TEMP_FILE_COMPRESSION_ENABLED =
-      "carbon.is.sort.temp.file.compression.enabled";
-  /**
-   * IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE
-   */
-  public static final String IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE = "false";
-  /**
-   * SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-   */
-  @CarbonProperty
-  public static final String SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION =
-      "carbon.sort.temp.file.no.of.records.for.compression";
-  /**
-   * SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE
-   */
-  public static final String SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE = "50";
-  /**
    * DEFAULT_COLLECTION_SIZE
    */
   public static final int DEFAULT_COLLECTION_SIZE = 16;
@@ -1374,6 +1354,17 @@ public final class CarbonCommonConstants {
   public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
 
   /**
+   * name of compressor to compress sort temp files
+   */
+  @CarbonProperty
+  public static final String CARBON_SORT_TEMP_COMPRESSOR = "carbon.sort.temp.compressor";
+
+  /**
+   * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4'.
+   * By default, empty means that Carbondata will not compress the sort temp files.
+   */
+  public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = "";
+  /**
    * Which storage level to persist rdd when sort_scope=global_sort
    */
   @CarbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index fcd230a..a8513cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -18,10 +18,12 @@
 package org.apache.carbondata.core.datastore.filesystem;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,6 +42,8 @@ import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 
 public abstract  class AbstractDFSCarbonFile implements CarbonFile {
   /**
@@ -270,29 +274,8 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
 
   @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
       int bufferSize, Configuration hadoopConf) throws IOException {
-    path = path.replace("\\", "/");
-    boolean gzip = path.endsWith(".gz");
-    boolean bzip2 = path.endsWith(".bz2");
-    InputStream stream;
-    Path pt = new Path(path);
-    FileSystem fs = pt.getFileSystem(hadoopConf);
-    if (bufferSize == -1) {
-      stream = fs.open(pt);
-    } else {
-      stream = fs.open(pt, bufferSize);
-    }
-    String codecName = null;
-    if (gzip) {
-      codecName = GzipCodec.class.getName();
-    } else if (bzip2) {
-      codecName = BZip2Codec.class.getName();
-    }
-    if (null != codecName) {
-      CompressionCodecFactory ccf = new CompressionCodecFactory(hadoopConf);
-      CompressionCodec codec = ccf.getCodecByClassName(codecName);
-      stream = codec.createInputStream(stream);
-    }
-    return new DataInputStream(new BufferedInputStream(stream));
+    return getDataInputStream(path, fileType, bufferSize,
+        CarbonUtil.inferCompressorFromFileName(path));
   }
 
   /**
@@ -315,6 +298,49 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
     return new DataInputStream(new BufferedInputStream(stream));
   }
 
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    InputStream inputStream;
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    if (bufferSize <= 0) {
+      inputStream = fs.open(pt);
+    } else {
+      inputStream = fs.open(pt, bufferSize);
+    }
+
+    String codecName = getCodecNameFromCompressor(compressor);
+    if (!codecName.isEmpty()) {
+      CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
+      inputStream = codec.createInputStream(inputStream);
+    }
+
+    return new DataInputStream(new BufferedInputStream(inputStream));
+  }
+
+  /**
+   * get codec name from user specified compressor name
+   * @param compressorName user specified compressor name
+   * @return name of codec
+   * @throws IOException
+   */
+  private String getCodecNameFromCompressor(String compressorName) throws IOException {
+    if (compressorName.isEmpty()) {
+      return "";
+    } else if ("GZIP".equalsIgnoreCase(compressorName)) {
+      return GzipCodec.class.getName();
+    } else if ("BZIP2".equalsIgnoreCase(compressorName)) {
+      return BZip2Codec.class.getName();
+    } else if ("SNAPPY".equalsIgnoreCase(compressorName)) {
+      return SnappyCodec.class.getName();
+    } else if ("LZ4".equalsIgnoreCase(compressorName)) {
+      return Lz4Codec.class.getName();
+    } else {
+      throw new IOException("Unsuppotted compressor: " + compressorName);
+    }
+  }
+
   @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
@@ -331,6 +357,26 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
     return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
   }
 
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    OutputStream outputStream;
+    if (bufferSize <= 0) {
+      outputStream = fs.create(pt);
+    } else {
+      outputStream = fs.create(pt, true, bufferSize);
+    }
+
+    String codecName = getCodecNameFromCompressor(compressor);
+    if (!codecName.isEmpty()) {
+      CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
+      outputStream = codec.createOutputStream(outputStream);
+    }
+
+    return new DataOutputStream(new BufferedOutputStream(outputStream));
+  }
+
   @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
       boolean performFileCheck) throws IOException {
     filePath = filePath.replace("\\", "/");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 94f088b..16cd0e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -77,6 +77,18 @@ public interface CarbonFile {
   DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, int bufferSize,
       Configuration configuration) throws IOException;
 
+  /**
+   * get data input stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressor name of compressor to write this file
+   * @return dataInputStream
+   * @throws IOException
+   */
+  DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      String compressor) throws IOException;
+
   DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, int bufferSize,
       long offset) throws IOException;
 
@@ -86,6 +98,18 @@ public interface CarbonFile {
   DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
       long blockSize) throws IOException;
 
+  /**
+   * get data output stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressor name of compressor to write this file
+   * @return DataOutputStream
+   * @throws IOException
+   */
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      String compressor) throws IOException;
+
   boolean isFileExist(String filePath, FileFactory.FileType fileType, boolean performFileCheck)
       throws IOException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 30eb7fe..39ca521 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -28,8 +28,10 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.channels.FileChannel;
 import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -37,10 +39,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
 
 public class LocalCarbonFile implements CarbonFile {
   private static final LogService LOGGER =
@@ -123,7 +130,7 @@ public class LocalCarbonFile implements CarbonFile {
   }
 
   public boolean renameTo(String changetoName) {
-    changetoName = getUpdatedFilePath(changetoName);
+    changetoName = FileFactory.getUpdatedFilePath(changetoName, FileFactory.FileType.LOCAL);
     return file.renameTo(new File(changetoName));
   }
 
@@ -231,60 +238,46 @@ public class LocalCarbonFile implements CarbonFile {
         return file.renameTo(new File(changetoName));
       }
     }
-
     return file.renameTo(new File(changetoName));
-
-  }
-
-  /**
-   * below method will be used to update the file path
-   * for local type
-   * it removes the file:/ from the path
-   *
-   * @param filePath
-   * @return updated file path without url for local
-   */
-  private static String getUpdatedFilePath(String filePath) {
-    if (filePath != null && !filePath.isEmpty()) {
-      // If the store path is relative then convert to absolute path.
-      if (filePath.startsWith("./")) {
-        try {
-          return new File(filePath).getCanonicalPath();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      } else {
-        Path pathWithoutSchemeAndAuthority =
-            Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
-        return pathWithoutSchemeAndAuthority.toString();
-      }
-    } else {
-      return filePath;
-    }
   }
 
   @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, boolean append) throws FileNotFoundException {
-    path = getUpdatedFilePath(path);
+    path = FileFactory.getUpdatedFilePath(path, FileFactory.FileType.LOCAL);
     return new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
   }
 
   @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
       int bufferSize, Configuration configuration) throws IOException {
+    return getDataInputStream(path, fileType, bufferSize,
+        CarbonUtil.inferCompressorFromFileName(path));
+  }
+
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
     path = path.replace("\\", "/");
-    boolean gzip = path.endsWith(".gz");
-    boolean bzip2 = path.endsWith(".bz2");
-    InputStream stream;
     path = FileFactory.getUpdatedFilePath(path, fileType);
-    if (gzip) {
-      stream = new GZIPInputStream(new FileInputStream(path));
-    } else if (bzip2) {
-      stream = new BZip2CompressorInputStream(new FileInputStream(path));
+    InputStream inputStream;
+    if (compressor.isEmpty()) {
+      inputStream = new FileInputStream(path);
+    } else if ("GZIP".equalsIgnoreCase(compressor)) {
+      inputStream = new GZIPInputStream(new FileInputStream(path));
+    } else if ("BZIP2".equalsIgnoreCase(compressor)) {
+      inputStream = new BZip2CompressorInputStream(new FileInputStream(path));
+    } else if ("SNAPPY".equalsIgnoreCase(compressor)) {
+      inputStream = new SnappyInputStream(new FileInputStream(path));
+    } else if ("LZ4".equalsIgnoreCase(compressor)) {
+      inputStream = new LZ4BlockInputStream(new FileInputStream(path));
     } else {
-      stream = new FileInputStream(path);
+      throw new IOException("Unsupported compressor: " + compressor);
+    }
+
+    if (bufferSize <= 0) {
+      return new DataInputStream(new BufferedInputStream(inputStream));
+    } else {
+      return new DataInputStream(new BufferedInputStream(inputStream, bufferSize));
     }
-    return new DataInputStream(new BufferedInputStream(stream));
   }
 
   /**
@@ -319,7 +312,7 @@ public class LocalCarbonFile implements CarbonFile {
   @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
-    path = getUpdatedFilePath(path);
+    path = FileFactory.getUpdatedFilePath(path, FileFactory.FileType.LOCAL);
     return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
   }
 
@@ -330,6 +323,32 @@ public class LocalCarbonFile implements CarbonFile {
     return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path), bufferSize));
   }
 
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    path = path.replace("\\", "/");
+    path = FileFactory.getUpdatedFilePath(path, fileType);
+    OutputStream outputStream;
+    if (compressor.isEmpty()) {
+      outputStream = new FileOutputStream(path);
+    } else if ("GZIP".equalsIgnoreCase(compressor)) {
+      outputStream = new GZIPOutputStream(new FileOutputStream(path));
+    } else if ("BZIP2".equalsIgnoreCase(compressor)) {
+      outputStream = new BZip2CompressorOutputStream(new FileOutputStream(path));
+    } else if ("SNAPPY".equalsIgnoreCase(compressor)) {
+      outputStream = new SnappyOutputStream(new FileOutputStream(path));
+    } else if ("LZ4".equalsIgnoreCase(compressor)) {
+      outputStream = new LZ4BlockOutputStream(new FileOutputStream(path));
+    } else {
+      throw new IOException("Unsupported compressor: " + compressor);
+    }
+
+    if (bufferSize <= 0) {
+      return new DataOutputStream(new BufferedOutputStream(outputStream));
+    } else {
+      return new DataOutputStream(new BufferedOutputStream(outputStream, bufferSize));
+    }
+  }
+
   @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
       boolean performFileCheck) throws IOException {
     filePath = filePath.replace("\\", "/");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 2373080..e6fbd04 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -108,6 +108,19 @@ public final class FileFactory {
   }
 
   /**
+   * get data input stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressorName name of compressor to read this file
+   * @return data input stream
+   * @throws IOException
+   */
+  public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
+      String compressorName) throws IOException {
+    return getCarbonFile(path).getDataInputStream(path, fileType, bufferSize, compressorName);
+  }
+  /**
    * return the datainputStream which is seek to the offset of file
    *
    * @param path
@@ -138,6 +151,19 @@ public final class FileFactory {
   }
 
   /**
+   * get data out put stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressorName name of compressor to write this file
+   * @return data out put stram
+   * @throws IOException
+   */
+  public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
+      String compressorName) throws IOException {
+    return getCarbonFile(path).getDataOutputStream(path, fileType, bufferSize, compressorName);
+  }
+  /**
    * This method checks the given path exists or not and also is it file or
    * not if the performFileCheck is true
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 19a3cf3..8042cfa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1015,6 +1015,23 @@ public final class CarbonProperties {
   }
 
   /**
+   * get compressor name for compressing sort temp files
+   * @return compressor name
+   */
+  public String getSortTempCompressor() {
+    String compressor = getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+        CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT).toUpperCase();
+    if (compressor.isEmpty() || "SNAPPY".equals(compressor) || "GZIP".equals(compressor)
+        || "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
+      return compressor;
+    } else {
+      LOGGER.error("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
+          .concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and")
+          .concat(" empty are allowed. It will not compress the sort temp files by default"));
+      return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
+    }
+  }
+  /**
    * returns true if carbon property
    * @param key
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ffe4654..f87b7e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -763,6 +763,21 @@ public final class CarbonUtil {
     return defaultFsUrl + currentPath;
   }
 
+  /**
+   * infer compress name from file name
+   * @param path file name
+   * @return compressor name
+   */
+  public static String inferCompressorFromFileName(String path) {
+    if (path.endsWith(".gz")) {
+      return "GZIP";
+    } else if (path.endsWith("bz2")) {
+      return "BZIP2";
+    } else {
+      return "";
+    }
+  }
+
   private static boolean checkIfPrefixExists(String path) {
     final String lowerPath = path.toLowerCase(Locale.getDefault());
     return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/test/resources/sampleCSV.csv
----------------------------------------------------------------------
diff --git a/core/src/test/resources/sampleCSV.csv b/core/src/test/resources/sampleCSV.csv
new file mode 100644
index 0000000..79dfd50
--- /dev/null
+++ b/core/src/test/resources/sampleCSV.csv
@@ -0,0 +1 @@
+id,name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md b/docs/useful-tips-on-carbondata.md
index 0bf2940..aaf6460 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -168,5 +168,6 @@
   | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data loading | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. |
+  | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
 
   Note: If your CarbonData instance is provided only for query, you may specify the property 'spark.speculation=true' which is in conf directory of spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
new file mode 100644
index 0000000..61acea4
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestLoadWithSortTempCompressed extends QueryTest
+  with BeforeAndAfterEach with BeforeAndAfterAll {
+  val originOffHeapStatus: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+  val originSortTempCompressor: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT)
+  val simpleTable = "simpleTable"
+  val complexCarbonTable = "complexCarbonTable"
+  val complexHiveTable = "complexHiveTable"
+
+  override def beforeEach(): Unit = {
+    sql(s"drop table if exists $simpleTable")
+    sql(s"drop table if exists $complexCarbonTable")
+    sql(s"drop table if exists $complexHiveTable")
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"drop table if exists $simpleTable")
+    sql(s"drop table if exists $complexCarbonTable")
+    sql(s"drop table if exists $complexHiveTable")
+  }
+
+
+  override protected def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
+  }
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+        originSortTempCompressor)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, originOffHeapStatus)
+  }
+
+  private def testSimpleTable(): Unit = {
+    val lineNum: Int = 10002
+    val df = {
+      import sqlContext.implicits._
+      sqlContext.sparkContext.parallelize((1 to lineNum).reverse)
+        .map(x => (s"a$x", s"b$x", s"c$x", 12.3 + x, x, System.currentTimeMillis(), s"d$x"))
+        .toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7")
+    }
+
+    df.write
+      .format("carbondata")
+      .option("tableName", simpleTable)
+      .option("tempCSV", "true")
+      .option("DICTIONARY_INCLUDE", "c1,c2")
+      .option("SORT_COLUMNS", "c1,c3")
+      .save()
+
+    checkAnswer(sql(s"select count(*) from $simpleTable"), Row(lineNum))
+    checkAnswer(sql(s"select count(*) from $simpleTable where c5 > 5001"), Row(5001))
+  }
+
+  test("test data load for simple table with sort temp compressed with snappy" +
+       " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    testSimpleTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with snappy" +
+       " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    testSimpleTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  private def testComplexTable(): Unit = {
+    // note: following tests are copied from `TestComplexTypeQuery`
+    sql(
+      s"create table $complexCarbonTable(deviceInformationId int, channelsId string, ROMSize " +
+      "string, ROMName String, purchasedate string, mobile struct<imei:string, imsi:string>, MAC " +
+      "array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+      "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>, " +
+      "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+      "double,contractNumber double)  STORED BY 'org.apache.carbondata.format'  TBLPROPERTIES " +
+      "('DICTIONARY_INCLUDE'='deviceInformationId', 'DICTIONARY_EXCLUDE'='channelsId'," +
+      "'COLUMN_GROUP'='(ROMSize,ROMName)')")
+    sql(s"LOAD DATA local inpath '$resourcesPath/complextypesample.csv' INTO table" +
+        s" $complexCarbonTable  OPTIONS('DELIMITER'=',', " +
+        "'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
+        "purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber', " +
+        "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+    sql(
+      s"create table $complexHiveTable(deviceInformationId int, channelsId string, ROMSize " +
+      "string, ROMName String, purchasedate string,mobile struct<imei:string, imsi:string>,MAC " +
+      "array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+      "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>, " +
+      "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+      "double,contractNumber double)row format delimited fields terminated by ',' collection " +
+      "items terminated by '$' map keys terminated by ':'")
+    sql(s"LOAD DATA local inpath '$resourcesPath/complextypesample.csv' INTO table" +
+        s" $complexHiveTable")
+
+    checkAnswer(sql(s"select * from $complexCarbonTable"), sql(s"select * from $complexHiveTable"))
+    checkAnswer(sql(s"select MAC from $complexCarbonTable where MAC[0] = 'MAC1'"),
+      sql(s"select MAC from $complexHiveTable where MAC[0] = 'MAC1'"))
+    checkAnswer(sql(s"select mobile from $complexCarbonTable where mobile.imei like '1AA%'"),
+      sql(s"select mobile from $complexHiveTable where mobile.imei like '1AA%'"))
+    checkAnswer(sql(s"select locationinfo from $complexCarbonTable" +
+                    " where locationinfo[0].ActiveAreaId > 2 AND locationinfo[0].ActiveAreaId < 7"),
+      sql(s"select locationinfo from $complexHiveTable" +
+          " where locationinfo[0].ActiveAreaId > 2 AND locationinfo[0].ActiveAreaId < 7"))
+  }
+
+  test("test data load for complex table with sort temp compressed with snappy" +
+       " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    testComplexTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for complex table with sort temp compressed with snappy" +
+       " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    testComplexTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index cefc97d..6432d38 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -80,11 +80,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
         new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(),
-            sortParameters.getDimColCount(),
-            sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
-            sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
-            sortParameters.getNoDictionaryDimnesionColumn(),
-            sortParameters.getNoDictionarySortColumn());
+            sortParameters);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 51db3a0..c7030dd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -141,10 +141,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
-            sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
-            sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
-            sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
-            this.sortParameters.getNoDictionarySortColumn());
+            sortParameters);
   }
 
   @Override public void close() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 0210464..4dd5e44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -31,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
@@ -279,13 +278,19 @@ public class UnsafeSortDataRows {
     startFileBasedMerge();
   }
 
-  private void writeData(UnsafeCarbonRowPage rowPage, File file)
+  /**
+   * write a page to sort temp file
+   * @param rowPage page
+   * @param file file
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void writeDataToFile(UnsafeCarbonRowPage rowPage, File file)
       throws CarbonSortKeyAndGroupByException {
     DataOutputStream stream = null;
     try {
       // open stream
-      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
-          parameters.getFileWriteBufferSize()));
+      stream = FileFactory.getDataOutputStream(file.getPath(), FileFactory.FileType.LOCAL,
+          parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName());
       int actualSize = rowPage.getBuffer().getActualSize();
       // write number of entries to the file
       stream.writeInt(actualSize);
@@ -372,7 +377,7 @@ public class UnsafeSortDataRows {
           File sortTempFile = new File(
               tmpDir + File.separator + parameters.getTableName()
                   + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-          writeData(page, sortTempFile);
+          writeDataToFile(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
               + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
               + sortTempFile);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 3972b1c..11b3d43 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
@@ -32,6 +30,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -69,27 +68,13 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    * return row
    */
   private Object[] returnRow;
-
-  /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
-   * fileBufferSize for file reader stream size
-   */
-  private int fileBufferSize;
-
+  private int dimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
+  private int readBufferSize;
+  private String compressorName;
   private Object[][] currentBuffer;
 
   private Object[][] backupBuffer;
@@ -109,29 +94,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   private int prefetchRecordsProceesed;
 
   /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortTempFileCompressionEnabled;
-
-  /**
    * totalRecordFetch
    */
   private int totalRecordFetch;
 
-  private int noDictionaryCount;
-
-  private DataType[] measureDataType;
-
   private int numberOfObjectRead;
-  /**
-   * to store whether dimension is of dictionary type or not
-   */
-  private boolean[] isNoDictionaryDimensionColumn;
 
   private int nullSetWordsLength;
 
@@ -143,19 +110,16 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
     // set temp file
     this.tempFile = tempFile;
+    this.dimCnt = parameters.getDimColCount();
+    this.complexCnt = parameters.getComplexDimColCount();
+    this.measureCnt = parameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = parameters.getMeasureDataType();
+    this.readBufferSize = parameters.getBufferSize();
+    this.compressorName = parameters.getSortTempCompressorName();
 
-    // set measure and dimension count
-    this.measureCount = parameters.getMeasureColCount();
-    this.dimensionCount = parameters.getDimColCount();
-    this.complexDimensionCount = parameters.getComplexDimColCount();
-
-    this.noDictionaryCount = parameters.getNoDictionaryCount();
-    // set mdkey length
-    this.fileBufferSize = parameters.getFileBufferSize();
     this.executorService = Executors.newFixedThreadPool(1);
-    this.measureDataType = parameters.getMeasureDataType();
-    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-    this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
+    this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1;
     comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
@@ -172,44 +136,13 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     bufferSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
             CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
-    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
-    if (this.isSortTempFileCompressionEnabled) {
-      LOGGER.info("Compression was used while writing the sortTempFile");
-    }
-
-    try {
-      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (this.sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
-            + " be used");
-
-        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed.Default value will be used");
-      this.sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-
     initialise();
   }
 
   private void initialise() {
     try {
-      if (isSortTempFileCompressionEnabled) {
-        this.bufferSize = sortTempFileNoOFRecordsInCompression;
-      }
-      stream = new DataInputStream(
-          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
+      stream = FileFactory.getDataInputStream(tempFile.getPath(), FileFactory.FileType.LOCAL,
+          readBufferSize, compressorName);
       this.entryCount = stream.readInt();
       LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount);
       if (prefetch) {
@@ -218,12 +151,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
         if (totalRecordFetch < this.entryCount) {
           submit = executorService.submit(new DataFetcher(true));
         }
-      } else {
-        if (isSortTempFileCompressionEnabled) {
-          new DataFetcher(false).call();
-        }
       }
-
     } catch (FileNotFoundException e) {
       LOGGER.error(e);
       throw new RuntimeException(tempFile + " No Found", e);
@@ -244,19 +172,6 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   public void readRow() throws CarbonSortKeyAndGroupByException {
     if (prefetch) {
       fillDataForPrefetch();
-    } else if (isSortTempFileCompressionEnabled) {
-      if (bufferRowCounter >= bufferSize) {
-        try {
-          new DataFetcher(false).call();
-          bufferRowCounter = 0;
-        } catch (Exception e) {
-          LOGGER.error(e);
-          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
-        }
-
-      }
-      prefetchRecordsProceesed++;
-      returnRow = currentBuffer[bufferRowCounter++];
     } else {
       this.returnRow = getRowFromStream();
     }
@@ -296,7 +211,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    * @throws CarbonSortKeyAndGroupByException
    */
   private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
-    Object[] row = new Object[dimensionCount + measureCount];
+    Object[] row = new Object[dimCnt + measureCnt];
     try {
       int dimCount = 0;
       for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
@@ -312,7 +227,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
       }
 
       // write complex dimensions here.
-      for (; dimCount < dimensionCount; dimCount++) {
+      for (; dimCount < dimCnt; dimCount++) {
         short aShort = stream.readShort();
         byte[] col = new byte[aShort];
         stream.readFully(col);
@@ -324,25 +239,24 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
         words[i] = stream.readLong();
       }
 
-      for (int mesCount = 0; mesCount < measureCount; mesCount++) {
+      for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
         if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          DataType dataType = measureDataType[mesCount];
+          DataType dataType = measureDataTypes[mesCount];
           if (dataType == DataTypes.SHORT) {
-            row[dimensionCount + mesCount] = stream.readShort();
+            row[dimCount + mesCount] = stream.readShort();
           } else if (dataType == DataTypes.INT) {
-            row[dimensionCount + mesCount] = stream.readInt();
+            row[dimCount + mesCount] = stream.readInt();
           } else if (dataType == DataTypes.LONG) {
-            row[dimensionCount + mesCount] = stream.readLong();
+            row[dimCount + mesCount] = stream.readLong();
           } else if (dataType == DataTypes.DOUBLE) {
-            row[dimensionCount + mesCount] = stream.readDouble();
+            row[dimCount + mesCount] = stream.readDouble();
           } else if (DataTypes.isDecimal(dataType)) {
             short aShort = stream.readShort();
             byte[] bigDecimalInBytes = new byte[aShort];
             stream.readFully(bigDecimalInBytes);
-            row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+            row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
           } else {
-            throw new IllegalArgumentException(
-                "unsupported data type:" + measureDataType[mesCount]);
+            throw new IllegalArgumentException("unsupported data type:" + dataType);
           }
         }
       }
@@ -368,7 +282,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    * @return more row present in file
    */
   public boolean hasNext() {
-    if (prefetch || isSortTempFileCompressionEnabled) {
+    if (prefetch) {
       return this.prefetchRecordsProceesed < this.entryCount;
     }
     return this.numberOfObjectRead < this.entryCount;
@@ -412,10 +326,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += 31 * measureCount;
-    hash += 31 * dimensionCount;
-    hash += 31 * complexDimensionCount;
-    hash += 31 * noDictionaryCount;
+    hash += 31 * measureCnt;
+    hash += 31 * dimCnt;
+    hash += 31 * complexCnt;
     hash += tempFile.hashCode();
     return hash;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 7328899..4bbf61b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -17,11 +17,9 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.merger;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -32,6 +30,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -41,8 +40,6 @@ import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunk
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriter;
-import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriterFactory;
 
 public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
@@ -71,18 +68,19 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
    */
   private int totalNumberOfRecords;
 
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
   private SortParameters mergerParameters;
 
   private File[] intermediateFiles;
 
   private File outPutFile;
 
-  private boolean[] noDictionarycolumnMapping;
+  private int dimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
+  private int writeBufferSize;
+  private String compressorName;
 
   private long[] nullSetWords;
 
@@ -99,8 +97,14 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
-    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
-    this.nullSetWords = new long[((mergerParameters.getMeasureColCount() - 1) >> 6) + 1];
+    this.dimCnt = mergerParameters.getDimColCount();
+    this.complexCnt = mergerParameters.getComplexDimColCount();
+    this.measureCnt = mergerParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = mergerParameters.getMeasureDataType();
+    this.writeBufferSize = mergerParameters.getBufferSize();
+    this.compressorName = mergerParameters.getSortTempCompressorName();
+    this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1];
     // Take size of 2 MB for each row. I think it is high enough to use
     rowData = ByteBuffer.allocate(2 * 1024 * 1024);
   }
@@ -112,7 +116,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
       startSorting();
       initialize();
       while (hasNext()) {
-        writeDataTofile(next());
+        writeDataToFile(next());
       }
       double intermediateMergeCostTime =
           (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
@@ -124,9 +128,6 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
       throwable = e;
     } finally {
       CarbonUtil.closeStreams(this.stream);
-      if (null != writer) {
-        writer.finish();
-      }
       if (null == throwable) {
         try {
           finish();
@@ -152,24 +153,14 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
-      try {
-        this.stream = new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(outPutFile),
-                mergerParameters.getFileWriteBufferSize()));
-        this.stream.writeInt(this.totalNumberOfRecords);
-      } catch (FileNotFoundException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
-      }
-    } else {
-      writer = TempSortFileWriterFactory.getInstance()
-          .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
-              mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
-              mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(outPutFile, totalNumberOfRecords);
+    try {
+      stream = FileFactory.getDataOutputStream(outPutFile.getPath(), FileFactory.FileType.LOCAL,
+          writeBufferSize, compressorName);
+      this.stream.writeInt(this.totalNumberOfRecords);
+    } catch (FileNotFoundException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
     }
   }
 
@@ -283,12 +274,11 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
+  private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
     int dimCount = 0;
     int size = 0;
-    DataType[] type = mergerParameters.getMeasureDataType();
-    for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
-      if (noDictionarycolumnMapping[dimCount]) {
+    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+      if (isNoDictionaryDimensionColumn[dimCount]) {
         byte[] col = (byte[]) row[dimCount];
         rowData.putShort((short) col.length);
         size += 2;
@@ -301,9 +291,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     }
 
     // write complex dimensions here.
-    int dimensionSize =
-        mergerParameters.getDimColCount() + mergerParameters.getComplexDimColCount();
-    int measureSize = mergerParameters.getMeasureColCount();
+    int dimensionSize = dimCnt + complexCnt;
     for (; dimCount < dimensionSize; dimCount++) {
       byte[] col = (byte[]) row[dimCount];
       rowData.putShort((short)col.length);
@@ -315,10 +303,10 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     int nullSetSize = nullSetWords.length * 8;
     int nullLoc = size;
     size += nullSetSize;
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+    for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        DataType dataType = type[mesCount];
+        DataType dataType = measureDataTypes[mesCount];
         if (dataType == DataTypes.SHORT) {
           rowData.putShort(size, (Short) value);
           size += 2;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index eece8f2..de3572e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -130,7 +130,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   private SortIntermediateFileMerger intermediateFileMerger;
 
   private List<String> partitionNames;
-
+  private SortParameters sortParameters;
 
   public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
       SegmentProperties segmentProperties, CompactionType compactionType, String tableName,
@@ -349,11 +349,11 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
       noDictionaryCount++;
     }
     dimensionColumnCount = dimensions.size();
-    SortParameters parameters = createSortParameters();
-    intermediateFileMerger = new SortIntermediateFileMerger(parameters);
+    sortParameters = createSortParameters();
+    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
     // TODO: Now it is only supported onheap merge, but we can have unsafe merge
     // as well by using UnsafeSortDataRows.
-    this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
+    this.sortDataRows = new SortDataRows(sortParameters, intermediateFileMerger);
     try {
       this.sortDataRows.initialize();
     } catch (CarbonSortKeyAndGroupByException e) {
@@ -389,13 +389,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
       System.arraycopy(noDictionaryColMapping, 0,
           noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
     }
+    sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
 
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
-        new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount,
-            segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount,
-            dataTypes, noDictionaryColMapping, noDictionarySortColumnMapping);
+        new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, sortParameters);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
deleted file mode 100644
index 1302a5b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public abstract class AbstractTempSortFileWriter implements TempSortFileWriter {
-
-  /**
-   * writeFileBufferSize
-   */
-  protected int writeBufferSize;
-
-  /**
-   * Measure count
-   */
-  protected int measureCount;
-
-  /**
-   * Measure count
-   */
-  protected int dimensionCount;
-
-  /**
-   * complexDimension count
-   */
-  protected int complexDimensionCount;
-
-  /**
-   * stream
-   */
-  protected DataOutputStream stream;
-
-  /**
-   * noDictionaryCount
-   */
-  protected int noDictionaryCount;
-
-  /**
-   * AbstractTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public AbstractTempSortFileWriter(int dimensionCount, int complexDimensionCount, int measureCount,
-      int noDictionaryCount, int writeBufferSize) {
-    this.writeBufferSize = writeBufferSize;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-    this.measureCount = measureCount;
-    this.noDictionaryCount = noDictionaryCount;
-  }
-
-  /**
-   * Below method will be used to initialize the stream and write the entry count
-   */
-  @Override public void initiaize(File file, int entryCount)
-      throws CarbonSortKeyAndGroupByException {
-    try {
-      stream = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), writeBufferSize));
-      stream.writeInt(entryCount);
-    } catch (FileNotFoundException e1) {
-      throw new CarbonSortKeyAndGroupByException(e1);
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    }
-  }
-
-  /**
-   * Below method will be used to close the stream
-   */
-  @Override public void finish() {
-    CarbonUtil.closeStreams(stream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
deleted file mode 100644
index 40f650d..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public class CompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
-  /**
-   * CompressedTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public CompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
-      int measureCount, int noDictionaryCount, int writeBufferSize) {
-    super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
-  }
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   */
-  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
-    DataOutputStream dataOutputStream = null;
-    ByteArrayOutputStream blockDataArray = null;
-    int totalSize = 0;
-    int recordSize = 0;
-    try {
-      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
-          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      totalSize = records.length * recordSize;
-
-      blockDataArray = new ByteArrayOutputStream(totalSize);
-      dataOutputStream = new DataOutputStream(blockDataArray);
-
-      UnCompressedTempSortFileWriter
-          .writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
-              noDictionaryCount, complexDimensionCount);
-
-      stream.writeInt(records.length);
-      byte[] byteArray = CompressorFactory.getInstance().getCompressor()
-          .compressByte(blockDataArray.toByteArray());
-      stream.writeInt(byteArray.length);
-      stream.write(byteArray);
-
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    } finally {
-      CarbonUtil.closeStreams(blockDataArray);
-      CarbonUtil.closeStreams(dataOutputStream);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index bc65026..04efa1f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -17,11 +17,9 @@
 
 package org.apache.carbondata.processing.sort.sortdata;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.AbstractQueue;
@@ -30,6 +28,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -64,33 +63,19 @@ public class IntermediateFileMerger implements Callable<Void> {
    */
   private int totalNumberOfRecords;
 
-  /**
-   * records
-   */
-  private Object[][] records;
-
-  /**
-   * entryCount
-   */
-  private int entryCount;
-
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
-  /**
-   * totalSize
-   */
-  private int totalSize;
-
   private SortParameters mergerParameters;
 
   private File[] intermediateFiles;
 
   private File outPutFile;
-
-  private boolean[] noDictionarycolumnMapping;
+  private int dimCnt;
+  private int noDictDimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
+  private int writeBufferSize;
+  private String compressorName;
 
   private Throwable throwable;
 
@@ -103,7 +88,14 @@ public class IntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
-    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
+    this.dimCnt = mergerParameters.getDimColCount();
+    this.noDictDimCnt = mergerParameters.getNoDictionaryCount();
+    this.complexCnt = mergerParameters.getComplexDimColCount();
+    this.measureCnt = mergerParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = mergerParameters.getMeasureDataType();
+    this.writeBufferSize = mergerParameters.getBufferSize();
+    this.compressorName = mergerParameters.getSortTempCompressorName();
   }
 
   @Override public Void call() throws Exception {
@@ -113,19 +105,7 @@ public class IntermediateFileMerger implements Callable<Void> {
       startSorting();
       initialize();
       while (hasNext()) {
-        writeDataTofile(next());
-      }
-      if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
-        if (entryCount > 0) {
-          if (entryCount < totalSize) {
-            Object[][] temp = new Object[entryCount][];
-            System.arraycopy(records, 0, temp, 0, entryCount);
-            records = temp;
-            this.writer.writeSortTempFile(temp);
-          } else {
-            this.writer.writeSortTempFile(records);
-          }
-        }
+        writeDataToFile(next());
       }
       double intermediateMergeCostTime =
           (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
@@ -136,11 +116,7 @@ public class IntermediateFileMerger implements Callable<Void> {
       clear();
       throwable = e;
     } finally {
-      records = null;
       CarbonUtil.closeStreams(this.stream);
-      if (null != writer) {
-        writer.finish();
-      }
       if (null == throwable) {
         try {
           finish();
@@ -166,30 +142,14 @@ public class IntermediateFileMerger implements Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
-      try {
-        this.stream = new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(outPutFile),
-                mergerParameters.getFileWriteBufferSize()));
-        this.stream.writeInt(this.totalNumberOfRecords);
-      } catch (FileNotFoundException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
-      }
-    } else {
-      writer = TempSortFileWriterFactory.getInstance()
-          .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
-              mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
-              mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(outPutFile, totalNumberOfRecords);
-
-      if (mergerParameters.isPrefetch()) {
-        totalSize = mergerParameters.getBufferSize();
-      } else {
-        totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
-      }
+    try {
+      stream = FileFactory.getDataOutputStream(outPutFile.getPath(), FileFactory.FileType.LOCAL,
+          writeBufferSize, compressorName);
+      this.stream.writeInt(this.totalNumberOfRecords);
+    } catch (FileNotFoundException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
     }
   }
 
@@ -256,12 +216,7 @@ public class IntermediateFileMerger implements Callable<Void> {
     for (File tempFile : intermediateFiles) {
       // create chunk holder
       sortTempFileChunkHolder =
-          new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
-              mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
-              mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getMeasureDataType(),
-              mergerParameters.getNoDictionaryDimnesionColumn(),
-              mergerParameters.getNoDictionarySortColumn(), mergerParameters.getTableName());
+          new SortTempFileChunkHolder(tempFile, mergerParameters, mergerParameters.getTableName());
 
       // initialize
       sortTempFileChunkHolder.initialize();
@@ -311,30 +266,14 @@ public class IntermediateFileMerger implements Callable<Void> {
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
-    if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
-      if (entryCount == 0) {
-        records = new Object[totalSize][];
-        records[entryCount++] = row;
-        return;
-      }
-
-      records[entryCount++] = row;
-      if (entryCount == totalSize) {
-        this.writer.writeSortTempFile(records);
-        entryCount = 0;
-        records = new Object[totalSize][];
-      }
-      return;
-    }
+  private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException {
     try {
-      DataType[] measureDataType = mergerParameters.getMeasureDataType();
       int[] mdkArray = (int[]) row[0];
       byte[][] nonDictArray = (byte[][]) row[1];
       int mdkIndex = 0;
       int nonDictKeyIndex = 0;
       // write dictionary and non dictionary dimensions here.
-      for (boolean nodictinary : noDictionarycolumnMapping) {
+      for (boolean nodictinary : isNoDictionaryDimensionColumn) {
         if (nodictinary) {
           byte[] col = nonDictArray[nonDictKeyIndex++];
           stream.writeShort(col.length);
@@ -343,12 +282,18 @@ public class IntermediateFileMerger implements Callable<Void> {
           stream.writeInt(mdkArray[mdkIndex++]);
         }
       }
-
+      // write complex
+      for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) {
+        byte[] col = nonDictArray[nonDictKeyIndex++];
+        stream.writeShort(col.length);
+        stream.write(col);
+      }
+      // write measure
       int fieldIndex = 0;
-      for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
+      for (int counter = 0; counter < measureCnt; counter++) {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
-          DataType dataType = measureDataType[counter];
+          DataType dataType = measureDataTypes[counter];
           if (dataType == DataTypes.BOOLEAN) {
             stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
           } else if (dataType == DataTypes.SHORT) {
@@ -365,7 +310,7 @@ public class IntermediateFileMerger implements Callable<Void> {
             stream.writeInt(bigDecimalInBytes.length);
             stream.write(bigDecimalInBytes);
           } else {
-            throw new IllegalArgumentException("unsupported data type:" + measureDataType[counter]);
+            throw new IllegalArgumentException("unsupported data type:" + dataType);
           }
         } else {
           stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index db4c771..88695b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -74,39 +73,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   private String tableName;
 
   /**
-   * measureCount
-   */
-  private int measureCount;
-
-  /**
-   * dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * measure count
-   */
-  private int noDictionaryCount;
-
-  /**
-   * complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
    * tempFileLocation
    */
   private String[] tempFileLocation;
-
-  private DataType[] measureDataType;
-
-  /**
-   * below code is to check whether dimension
-   * is of no dictionary type or not
-   */
-  private boolean[] isNoDictionaryColumn;
-
-  private boolean[] isNoDictionarySortColumn;
+  private SortParameters sortParameters;
 
   private int maxThreadForSorting;
 
@@ -115,17 +85,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   private List<Future<Void>> mergerTask;
 
   public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
-      int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
+      SortParameters sortParameters) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-    this.measureCount = measureCount;
-    this.measureDataType = type;
-    this.noDictionaryCount = noDictionaryCount;
-    this.isNoDictionaryColumn = isNoDictionaryColumn;
-    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+    this.sortParameters = sortParameters;
     try {
       maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
@@ -211,9 +174,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
         @Override public Void call() throws CarbonSortKeyAndGroupByException {
             // create chunk holder
             SortTempFileChunkHolder sortTempFileChunkHolder =
-                new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                    measureCount, fileBufferSize, noDictionaryCount, measureDataType,
-                    isNoDictionaryColumn, isNoDictionarySortColumn, tableName);
+                new SortTempFileChunkHolder(tempFile, sortParameters, tableName);
           try {
             // initialize
             sortTempFileChunkHolder.initialize();


Mime
View raw message