carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/3] incubator-carbondata git commit: Added partitioner
Date Wed, 28 Dec 2016 14:32:19 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 65b922126 -> dc7c86ef3


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
new file mode 100644
index 0000000..c480d30
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.carbondata.bucketing
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+
+    // clean data folder
+    clean
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    spark.sql("DROP TABLE IF EXISTS t3")
+    spark.sql("DROP TABLE IF EXISTS t4")
+    spark.sql("DROP TABLE IF EXISTS t5")
+    spark.sql("DROP TABLE IF EXISTS t6")
+    spark.sql("DROP TABLE IF EXISTS t7")
+    spark.sql("DROP TABLE IF EXISTS t8")
+  }
+
+  test("test create table with buckets") {
+    spark.sql(
+      """
+           CREATE TABLE t4
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4")
+      """)
+    LoadTable(Some("default"), "t4", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4")
+    if (table != null && table.getBucketingInfo("t4") != null) {
+      assert(true)
+    } else {
+      assert(false, "Bucketing info does not exist")
+    }
+  }
+
+  test("test create table with no bucket join of carbon tables") {
+    spark.sql(
+      """
+           CREATE TABLE t5
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("tableName"="t5")
+      """)
+    LoadTable(Some("default"), "t5", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t5 t1, t5 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(shuffleExists, "shuffle should exist on non bucket tables")
+  }
+
+  test("test create table with bucket join of carbon tables") {
+    spark.sql(
+      """
+           CREATE TABLE t6
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6")
+      """)
+    LoadTable(Some("default"), "t6", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t6 t1, t6 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
+
+  test("test create table with bucket join of carbon table and parquet table") {
+    spark.sql(
+      """
+           CREATE TABLE t7
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7")
+      """)
+    LoadTable(Some("default"), "t7", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    spark.sql("DROP TABLE IF EXISTS bucketed_parquet_table")
+    spark.sql("select * from t7").write
+      .format("parquet")
+      .bucketBy(4, "name")
+      .saveAsTable("bucketed_parquet_table")
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t7 t1, bucketed_parquet_table t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
+
+  test("test create table with bucket join of carbon table and non bucket parquet table") {
+    spark.sql(
+      """
+           CREATE TABLE t8
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8")
+      """)
+    LoadTable(Some("default"), "t8", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    spark.sql("DROP TABLE IF EXISTS parquet_table")
+    spark.sql("select * from t8").write
+      .format("parquet")
+      .saveAsTable("parquet_table")
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t8 t1, parquet_table t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(shuffleExists, "shuffle should exist on non bucket tables")
+  }
+
+  override def afterAll {
+    spark.sql("DROP TABLE IF EXISTS t3")
+    spark.sql("DROP TABLE IF EXISTS t4")
+    spark.sql("DROP TABLE IF EXISTS t5")
+    spark.sql("DROP TABLE IF EXISTS t6")
+    spark.sql("DROP TABLE IF EXISTS t7")
+    spark.sql("DROP TABLE IF EXISTS t8")
+    // clean data folder
+    clean
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 20013ce..26300d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 
 public class CarbonDataLoadConfiguration {
 
@@ -36,6 +37,8 @@ public class CarbonDataLoadConfiguration {
 
   private String taskNo;
 
+  private BucketingInfo bucketingInfo;
+
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
   public int getDimensionCount() {
@@ -141,4 +144,12 @@ public class CarbonDataLoadConfiguration {
   public Object getDataLoadProperty(String key) {
     return dataLoadProperties.get(key);
   }
+
+  public BucketingInfo getBucketingInfo() {
+    return bucketingInfo;
+  }
+
+  public void setBucketingInfo(BucketingInfo bucketingInfo) {
+    this.bucketingInfo = bucketingInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index a5388d9..63147c9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
@@ -54,6 +55,15 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration =
         createConfiguration(loadModel, storeLocation);
+    if (configuration.getBucketingInfo() != null) {
+      return buildInternalForBucketing(inputIterators, configuration);
+    } else {
+      return buildInternal(inputIterators, configuration);
+    }
+  }
+
+  private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
     // 1. Reads the data input iterators and parses the data.
     AbstractDataLoadProcessorStep inputProcessorStep =
         new InputProcessorStepImpl(configuration, inputIterators);
@@ -70,6 +80,24 @@ public final class DataLoadProcessBuilder {
     return writerProcessorStep;
   }
 
+  private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) throws Exception {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data which are part of key (all dimensions except complex types)
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    AbstractDataLoadProcessorStep writerProcessorStep =
+        new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+    return writerProcessorStep;
+  }
+
   private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
       String storeLocation) throws Exception {
     if (!new File(storeLocation).mkdirs()) {
@@ -165,6 +193,7 @@ public final class DataLoadProcessBuilder {
       }
     }
     configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
+    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
index daf37fb..0689310 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -28,6 +28,8 @@ public class CarbonRow {
 
   private Object[] data;
 
+  public short bucketNumber;
+
   public CarbonRow(Object[] data) {
     this.data = data;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..3ca1497
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private SortIntermediateFileMerger intermediateFileMerger;
+
+  private ExecutorService executorService;
+
+  private BucketingInfo bucketingInfo;
+
+  private DataField[] inputDataFields;
+
+  private int sortBufferSize;
+
+  public ParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+      BucketingInfo bucketingInfo) {
+    this.inputDataFields = inputDataFields;
+    this.bucketingInfo = bucketingInfo;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
+    int buffer = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+    sortBufferSize = buffer/bucketingInfo.getNumberOfBuckets();
+    if (sortBufferSize < 100) {
+      sortBufferSize = 100;
+    }
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
+    try {
+      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+        SortParameters parameters = sortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        setTempLocation(parameters);
+        parameters.setBufferSize(sortBufferSize);
+        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMerger);
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.submit(new SortIteratorThread(iterators[i], sortDataRows));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, sortParameters);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    try {
+      intermediateFileMerger.finish();
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+    for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+      batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
+    }
+
+    return batchIterator;
+  }
+
+  private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+    String storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), bucketId,
+            sortParameters.getSegmentId() + "", false);
+    // Set the data file location
+    String dataFolderLocation =
+        storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    SingleThreadFinalSortFilesMerger finalMerger =
+        new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
+            sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
+            sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
+            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.isUseKettle());
+    return finalMerger;
+  }
+
+  @Override public void close() {
+    intermediateFileMerger.close();
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(),
+            parameters.getPartitionID(), parameters.getSegmentId(), false);
+    parameters.setTempFileLocation(
+        carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+   */
+  private static class SortIteratorThread implements Callable<Void> {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows[] sortDataRows;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+    }
+
+    @Override public Void call() throws CarbonDataLoadingException {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+          int i = 0;
+          while (batchIterator.hasNext()) {
+            CarbonRow row = batchIterator.next();
+            if (row != null) {
+              SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(e);
+      }
+      return null;
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private String partitionId;
+
+    private int batchSize;
+
+    private boolean firstRow = true;
+
+    public MergedDataIterator(String partitionId, int batchSize) {
+      this.partitionId = partitionId;
+      this.batchSize = batchSize;
+    }
+
+    private SingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(partitionId);
+        finalMerger.startFinalMerge();
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch();
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
new file mode 100644
index 0000000..16b203a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.partition.Partitioner;
+import org.apache.carbondata.core.partition.impl.HashPartitionerImpl;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.constants.LoggerAction;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.converter.RowConverter;
+import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+
+/**
+ * Replace row data fields with dictionary values if column is configured dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to byte[].
+ */
+public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
+
+  private RowConverter converter;
+
+  private Partitioner<Object[]> partitioner;
+
+  public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws CarbonDataLoadingException {
+    child.initialize();
+    BadRecordsLogger badRecordLogger = createBadRecordLogger();
+    converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    converter.initialize();
+    List<Integer> indexes = new ArrayList<>();
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    DataField[] inputDataFields = getOutput();
+    BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+    for (int i = 0; i < inputDataFields.length; i++) {
+      for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+        if (inputDataFields[i].getColumn().getColName()
+            .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+          indexes.add(i);
+          columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+          break;
+        }
+      }
+    }
+    partitioner =
+        new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets());
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      RowConverter localConverter = converter.createCopyForNewThread();
+      @Override public boolean hasNext() {
+        return childIter.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next(), localConverter);
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+    CarbonRowBatch newBatch = new CarbonRowBatch();
+    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
+    while (batchIterator.hasNext()) {
+      CarbonRow next = batchIterator.next();
+      CarbonRow convertRow = localConverter.convert(next);
+      convertRow.bucketNumber = (short) partitioner.getPartition(next.getData());
+      newBatch.addRow(convertRow);
+    }
+    return newBatch;
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    throw new UnsupportedOperationException();
+  }
+
+  private BadRecordsLogger createBadRecordLogger() {
+    boolean badRecordsLogRedirect = false;
+    boolean badRecordConvertNullDisable = false;
+    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    Object bad_records_action =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+            .toString();
+    if (null != bad_records_action) {
+      LoggerAction loggerAction = null;
+      try {
+        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        loggerAction = LoggerAction.FORCE;
+      }
+      switch (loggerAction) {
+        case FORCE:
+          badRecordConvertNullDisable = false;
+          break;
+        case REDIRECT:
+          badRecordsLogRedirect = true;
+          badRecordConvertNullDisable = true;
+          break;
+        case IGNORE:
+          badRecordsLogRedirect = false;
+          badRecordConvertNullDisable = true;
+          break;
+      }
+    }
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
+        identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
+            + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable,
+        badRecordConvertNullDisable);
+    return badRecordsLogger;
+  }
+
+  private String getBadLogStoreLocation(String storeLocation) {
+    String badLogStoreLocation =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (converter != null) {
+      converter.finish();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 48492d3..8318530 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -53,8 +53,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private KeyGenerator keyGenerator;
 
-  private CarbonFactHandler dataHandler;
-
   private int noDictionaryCount;
 
   private int complexDimensionCount;
@@ -71,57 +69,66 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
 
-  private String storeLocation;
-
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
   }
 
-  @Override
-  public DataField[] getOutput() {
+  @Override public DataField[] getOutput() {
     return child.getOutput();
   }
 
-  @Override
-  public void initialize() throws CarbonDataLoadingException {
+  @Override public void initialize() throws CarbonDataLoadingException {
     child.initialize();
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
+  }
 
-    storeLocation = CarbonDataProcessorUtil
+  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+    String storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
-            configuration.getPartitionId(), configuration.getSegmentId() + "", false);
-
-    if (!(new File(storeLocation).mkdirs())) {
-      LOGGER.error("Local data load folder location does not exist: " + storeLocation);
-      return;
-    }
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+            configuration.getSegmentId() + "", false);
+    new File(storeLocation).mkdirs();
+    return storeLocation;
   }
 
-  @Override
-  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
     Iterator<CarbonRowBatch>[] iterators = child.execute();
-    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
     try {
-      CarbonFactDataHandlerModel dataHandlerModel =
-          CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, storeLocation);
+      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
+          .createCarbonFactDataHandlerModel(configuration,
+              getStoreLocation(tableIdentifier, String.valueOf(0)), 0);
       noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
       complexDimensionCount = configuration.getComplexDimensionCount();
       measureCount = dataHandlerModel.getMeasureCount();
       segmentProperties = dataHandlerModel.getSegmentProperties();
       keyGenerator = segmentProperties.getDimensionKeyGenerator();
-      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
-          CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
-      dataHandler.initialise();
+
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
               System.currentTimeMillis());
+      int i = 0;
       for (Iterator<CarbonRowBatch> iterator : iterators) {
+        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+        CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+            .createCarbonFactDataHandlerModel(configuration, storeLocation, i);
+        CarbonFactHandler dataHandler = null;
+        boolean rowsNotExist = true;
         while (iterator.hasNext()) {
-          processBatch(iterator.next());
+          if (rowsNotExist) {
+            rowsNotExist = false;
+            dataHandler = CarbonFactHandlerFactory
+                .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+            dataHandler.initialise();
+          }
+          processBatch(iterator.next(), dataHandler);
+        }
+        if (!rowsNotExist) {
+          finish(tableName, dataHandler);
         }
+        i++;
       }
 
     } catch (CarbonDataWriterException e) {
@@ -135,9 +142,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return null;
   }
 
-  @Override
-  public void close() {
-    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+  @Override public void close() {
+
+  }
+
+  private void finish(String tableName, CarbonFactHandler dataHandler) {
     try {
       dataHandler.finish();
     } catch (Exception e) {
@@ -149,7 +158,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
             + writeCounter;
     LOGGER.info(logMessage);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
-    processingComplete();
+    processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
             System.currentTimeMillis());
@@ -157,7 +166,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
   }
 
-  private void processingComplete() throws CarbonDataLoadingException {
+  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
     if (null != dataHandler) {
       try {
         dataHandler.closeHandler();
@@ -171,7 +180,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     }
   }
 
-  private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException {
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
+      throws CarbonDataLoadingException {
     Iterator<CarbonRow> iterator = batch.getBatchIterator();
     try {
       while (iterator.hasNext()) {
@@ -208,8 +218,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     }
   }
 
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
+  @Override protected CarbonRow processRow(CarbonRow row) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index 99b7894..ef43751 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.newflow.sort.Sorter;
 import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
 import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 
@@ -60,7 +61,15 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
             CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
     if (offheapsort) {
       sorter = new UnsafeParallelReadMergeSorterImpl(child.getOutput());
-    } else sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    } else {
+      sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    }
+    if (configuration.getBucketingInfo() != null) {
+      sorter = new ParallelReadMergeSorterWithBucketingImpl(child.getOutput(),
+          configuration.getBucketingInfo());
+    } else {
+      sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    }
     sorter.initialize(sortParameters);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 6254cce..c24472b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -121,6 +121,34 @@ public class SortParameters {
    */
   private boolean useKettle = true;
 
+  public SortParameters getCopy() {
+    SortParameters parameters = new SortParameters();
+    parameters.tempFileLocation = tempFileLocation;
+    parameters.sortBufferSize = sortBufferSize;
+    parameters.measureColCount = measureColCount;
+    parameters.dimColCount = dimColCount;
+    parameters.complexDimColCount = complexDimColCount;
+    parameters.fileBufferSize = fileBufferSize;
+    parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+    parameters.fileWriteBufferSize = fileWriteBufferSize;
+    parameters.observer = observer;
+    parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+    parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
+    parameters.prefetch = prefetch;
+    parameters.bufferSize = bufferSize;
+    parameters.databaseName = databaseName;
+    parameters.tableName = tableName;
+    parameters.aggType = aggType;
+    parameters.noDictionaryCount = noDictionaryCount;
+    parameters.partitionID = partitionID;
+    parameters.segmentId = segmentId;
+    parameters.taskNo = taskNo;
+    parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.numberOfCores = numberOfCores;
+    parameters.useKettle = useKettle;
+    return parameters;
+  }
+
   public String getTempFileLocation() {
     return tempFileLocation;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2d468ac..f6a729d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -274,6 +274,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   private boolean useKettle;
 
+  private int bucketNumber;
+
   /**
    * CarbonFactDataHandler constructor
    */
@@ -293,6 +295,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     this.aggKeyBlock = new boolean[columnStoreCount];
     this.isNoDictionary = new boolean[columnStoreCount];
+    this.bucketNumber = carbonFactDataHandlerModel.getBucketId();
     this.isUseInvertedIndex = new boolean[columnStoreCount];
     if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) {
       for (int i = 0; i < isUseInvertedIndex.length; i++) {
@@ -1495,6 +1498,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setColCardinality(colCardinality);
     carbonDataWriterVo.setSegmentProperties(segmentProperties);
     carbonDataWriterVo.setTableBlocksize(tableBlockSize);
+    carbonDataWriterVo.setBucketNumber(bucketNumber);
     return carbonDataWriterVo;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index c7d9d29..51663fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -186,13 +186,15 @@ public class CarbonFactDataHandlerModel {
    */
   private boolean useKettle = true;
 
+  private int bucketId = 0;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    * @param configuration
    * @return CarbonFactDataHandlerModel
    */
   public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
-      CarbonDataLoadConfiguration configuration, String storeLocation) {
+      CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId) {
 
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
@@ -291,6 +293,7 @@ public class CarbonFactDataHandlerModel {
     } else {
       carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
     }
+    carbonFactDataHandlerModel.bucketId = bucketId;
     return carbonFactDataHandlerModel;
   }
 
@@ -558,5 +561,9 @@ public class CarbonFactDataHandlerModel {
   public void setUseKettle(boolean useKettle) {
     this.useKettle = useKettle;
   }
+
+  public int getBucketId() {
+    return bucketId;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 2c82672..5697cb6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -150,6 +150,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    */
   private void startSorting(File[] files) throws CarbonDataWriterException {
     this.fileCounter = files.length;
+    if (fileCounter == 0) {
+      LOGGER.info("No files to merge sort");
+      return;
+    }
     this.fileBufferSize = CarbonDataProcessorUtil
         .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
             CarbonCommonConstants.CONSTANT_SIZE_TEN);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a1e7311..0195392 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -271,6 +271,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     initFileCount();
     String carbonDataFileName = carbonTablePath
         .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+            dataWriterVo.getBucketNumber(),
             dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
     FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation());
@@ -423,12 +424,13 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   private void writeIndexFile() throws IOException, CarbonDataWriterException {
     // get the header
-    IndexHeader indexHeader =
-        CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList);
+    IndexHeader indexHeader = CarbonMetadataUtil
+        .getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber());
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath
         .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+            dataWriterVo.getBucketNumber(),
             dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     // open file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 6e0287d..e46430b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -66,6 +66,8 @@ public class CarbonDataWriterVo {
 
   private int tableBlocksize;
 
+  private int bucketNumber;
+
   /**
    * @return the storeLocation
    */
@@ -318,4 +320,11 @@ public class CarbonDataWriterVo {
     this.tableBlocksize = tableBlocksize;
   }
 
+  public int getBucketNumber() {
+    return bucketNumber;
+  }
+
+  public void setBucketNumber(int bucketNumber) {
+    this.bucketNumber = bucketNumber;
+  }
 }


Mime
View raw message