carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/3] carbondata git commit: Use sortBy operator in spark to load the data.
Date Tue, 13 Jun 2017 08:28:28 GMT
Use sortBy operator in spark to load the data.

Modify the review comments.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/82741c1f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/82741c1f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/82741c1f

Branch: refs/heads/master
Commit: 82741c1fe1f60d102e6fc19a8ffc300067f06ba4
Parents: 47d0780
Author: Yadong Qi <qiyadong2010@gmail.com>
Authored: Sat Jun 10 18:15:44 2017 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Tue Jun 13 16:27:39 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  11 +
 .../core/datastore/row/CarbonRow.java           |   3 +-
 docs/configuration-parameters.md                |   1 +
 .../src/test/resources/globalsort/sample1.csv   |   5 +
 .../src/test/resources/globalsort/sample2.csv   |   5 +
 .../src/test/resources/globalsort/sample3.csv   |   5 +
 .../dataload/TestGlobalSortDataLoad.scala       | 296 +++++++++++++++++++
 .../load/DataLoadProcessBuilderOnSpark.scala    | 159 ++++++++++
 .../load/DataLoadProcessorStepOnSpark.scala     | 235 +++++++++++++++
 .../spark/load/GlobalSortHelper.scala           |  40 +++
 .../carbondata/spark/load/ValidateUtil.scala    |  79 +++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  11 +
 .../execution/command/carbonTableSchema.scala   |  32 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  11 +
 .../execution/command/carbonTableSchema.scala   |  34 +--
 .../processing/model/CarbonLoadModel.java       |  62 ++++
 .../newflow/DataLoadProcessBuilder.java         |  13 +-
 .../newflow/sort/SortScopeOptions.java          |   6 +-
 .../newflow/sort/SortStepRowUtil.java           |  73 +++++
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  48 +--
 .../steps/DataConverterProcessorStepImpl.java   |  35 ++-
 .../steps/DataWriterProcessorStepImpl.java      |  32 +-
 .../schema/metadata/SortObserver.java           |   4 +-
 .../sortandgroupby/sortdata/SortParameters.java |   3 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  11 +-
 .../store/CarbonFactDataHandlerModel.java       |   8 +
 .../util/CarbonDataProcessorUtil.java           |  25 ++
 28 files changed, 1128 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 488c17b..ec13bd6 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1108,6 +1108,8 @@ public final class CarbonCommonConstants {
    * thus loading is faster but query maybe slower.
    * If set to LOCAL_SORT, the sorting scope is bigger and one index tree per data node will be
    * created, thus loading is slower but query is faster.
+   * If set to GLOBAL_SORT, the sorting scope is bigger and one index tree per task will be
+   * created, thus loading is slower but query is faster.
    */
   public static final String LOAD_SORT_SCOPE_DEFAULT = "LOCAL_SORT";
 
@@ -1117,6 +1119,15 @@ public final class CarbonCommonConstants {
    */
   public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";
 
+  /**
+   * The Number of partitions to use when shuffling data for sort. If user don't configurate or
+   * configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we
+   * recommend 2-3 tasks per CPU core in your cluster.
+   */
+  public static final String LOAD_GLOBAL_SORT_PARTITIONS = "carbon.load.global.sort.partitions";
+
+  public static final String LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT = "0";
+
   public static final String ENABLE_VECTOR_READER = "carbon.enable.vector.reader";
 
   public static final String ENABLE_VECTOR_READER_DEFAULT = "true";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index 86ac214..d981fa4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -17,12 +17,13 @@
 
 package org.apache.carbondata.core.datastore.row;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 /**
  * This row class is used to transfer the row data from one step to other step
  */
-public class CarbonRow {
+public class CarbonRow implements Serializable {
 
   private Object[] data;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index b71cdbc..deb5924 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -57,6 +57,7 @@ This section provides the details of all the configurations required for CarbonD
 | carbon.max.executor.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded at the executor side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. If this parameter is not configured, then the carbon.max.driver.lru.cache.size value will be considered. |  |
 | carbon.merge.sort.prefetch | true | Enable prefetch of data during merge sort while reading data from sort temp files in data loading. |  |
 | carbon.update.persist.enable | true | Enabling this parameter considers persistent data. Enabling this will reduce the execution time of UPDATE operation. |  |
+| carbon.load.global.sort.partitions | 0 | The Number of partitions to use when shuffling data for sort. If user don't configurate or configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we recommend 2-3 tasks per CPU core in your cluster.
 
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
new file mode 100644
index 0000000..9cb11be
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
@@ -0,0 +1,5 @@
+id,name,city,age
+1,a,wuhan,10
+2,b,hangzhou,20
+3,c,beijing,30
+4,d,shenzhen,40

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
new file mode 100644
index 0000000..300c254
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
@@ -0,0 +1,5 @@
+id,name,city,age
+5,e,wuhan,50
+6,f,hangzhou,60
+7,g,beijing,70
+eight,h,shenzhen,80

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
new file mode 100644
index 0000000..8e51dae
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
@@ -0,0 +1,5 @@
+id,name,city,age
+9,i,wuhan,90
+10,j,hangzhou,100
+11,k,beijing,110
+12,l,shenzhen,120

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
new file mode 100644
index 0000000..c00aaa3
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  var filePath: String = s"$resourcesPath/globalsort"
+
+  override def beforeEach {
+    resetConf()
+
+    sql("DROP TABLE IF EXISTS carbon_globalsort")
+    sql(
+      """
+        | CREATE TABLE carbon_globalsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+  }
+
+  override def afterEach {
+    resetConf()
+
+    sql("DROP TABLE IF EXISTS carbon_globalsort")
+  }
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS carbon_localsort_once")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort_once(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_once")
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS carbon_localsort_once")
+    sql("DROP TABLE IF EXISTS carbon_localsort_twice")
+    sql("DROP TABLE IF EXISTS carbon_localsort_triple")
+    sql("DROP TABLE IF EXISTS carbon_localsort_delete")
+    sql("DROP TABLE IF EXISTS carbon_localsort_update")
+    sql("DROP TABLE IF EXISTS carbon_globalsort")
+    sql("DROP TABLE IF EXISTS carbon_globalsort_partitioned")
+  }
+
+  // ----------------------------------- Compare Result -----------------------------------
+  test("Make sure the result is right and sorted in global level") {
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1')")
+
+    assert(getIndexFileCount("carbon_globalsort") === 1)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort"),
+      sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
+  }
+
+  // ----------------------------------- Bad Record -----------------------------------
+  test("Test GLOBAL_SORT with BAD_RECORDS_ACTION = 'FAIL'") {
+    intercept[Exception] {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+        "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'BAD_RECORDS_ACTION'='FAIL')")
+    }
+  }
+
+  test("Test GLOBAL_SORT with BAD_RECORDS_ACTION = 'REDIRECT'") {
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'BAD_RECORDS_ACTION'='REDIRECT')")
+
+    assert(getIndexFileCount("carbon_globalsort") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11)))
+  }
+
+  // ----------------------------------- Single Pass -----------------------------------
+  // Waiting for merge [CARBONDATA-1145]
+  ignore("Test GLOBAL_SORT with SINGLE_PASS") {
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'SINGLE_PASS'='TRUE')")
+
+    assert(getIndexFileCount("carbon_globalsort") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
+      sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
+  }
+
+  // ----------------------------------- Configuration Validity -----------------------------------
+  test("Don't support GLOBAL_SORT on partitioned table") {
+    sql("DROP TABLE IF EXISTS carbon_globalsort_partitioned")
+    sql(
+      """
+        | CREATE TABLE carbon_globalsort_partitioned(name STRING, city STRING, age INT)
+        | PARTITIONED BY (id INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+
+    intercept[MalformedCarbonCommandException] {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_partitioned " +
+        "OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
+    }
+  }
+
+  test("Number of partitions should be greater than 0") {
+    intercept[MalformedCarbonCommandException] {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+        "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='0')")
+    }
+
+    intercept[MalformedCarbonCommandException] {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+        "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='a')")
+    }
+  }
+
+  // ----------------------------------- Compaction -----------------------------------
+  test("Compaction GLOBAL_SORT * 2") {
+    sql("DROP TABLE IF EXISTS carbon_localsort_twice")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort_twice(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_twice")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_twice")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      s"OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      s"OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
+    sql("ALTER TABLE carbon_globalsort COMPACT 'MAJOR'")
+
+    assert(getIndexFileCount("carbon_globalsort") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(24)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
+      sql("SELECT * FROM carbon_localsort_twice ORDER BY name"))
+  }
+
+  test("Compaction GLOBAL_SORT + LOCAL_SORT + BATCH_SORT") {
+    sql("DROP TABLE IF EXISTS carbon_localsort_triple")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort_triple(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_triple")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_triple")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_triple")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      s"OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      s"OPTIONS('SORT_SCOPE'='LOCAL_SORT')")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      s"OPTIONS('SORT_SCOPE'='BATCH_SORT', 'BATCH_SORT_SIZE_INMB'='1')")
+    sql("ALTER TABLE carbon_globalsort COMPACT 'MAJOR'")
+
+    assert(getIndexFileCount("carbon_globalsort") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(36)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
+      sql("SELECT * FROM carbon_localsort_triple ORDER BY name"))
+  }
+
+  // ----------------------------------- Check Configurations -----------------------------------
+  // Waiting for merge SET feature[CARBONDATA-1065]
+  ignore("DDL > SET") {
+    sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = LOCAL_SORT")
+    sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = 5")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='2')")
+
+    assert(getIndexFileCount("carbon_globalsort") === 2)
+  }
+
+  test("DDL > carbon.properties") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "LOCAL_SORT")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, "5")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='2')")
+
+    assert(getIndexFileCount("carbon_globalsort") === 2)
+  }
+
+  // Waiting for merge SET feature[CARBONDATA-1065]
+  ignore("SET > carbon.properties") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "LOCAL_SORT")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, "5")
+    sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = GLOBAL_SORT")
+    sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = 2")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+
+    assert(getIndexFileCount("carbon_globalsort") === 2)
+  }
+
+  test("carbon.properties") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "GLOBAL_SORT")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, "2")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+
+    assert(getIndexFileCount("carbon_globalsort") === 2)
+  }
+
+  // ----------------------------------- IUD -----------------------------------
+  test("LOAD with DELETE") {
+    sql("DROP TABLE IF EXISTS carbon_localsort_delete")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort_delete(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_delete")
+    sql("DELETE FROM carbon_localsort_delete WHERE id = 1")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
+    sql("DELETE FROM carbon_globalsort WHERE id = 1")
+
+    assert(getIndexFileCount("carbon_globalsort") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
+      sql("SELECT * FROM carbon_localsort_delete ORDER BY name"))
+  }
+
+  test("LOAD with UPDATE") {
+    sql("DROP TABLE IF EXISTS carbon_localsort_update")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort_update(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_update")
+    sql("UPDATE carbon_localsort_update SET (name) = ('bb') WHERE id = 2").show
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
+      "OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
+    sql("UPDATE carbon_globalsort SET (name) = ('bb') WHERE id = 2").show
+
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
+    checkAnswer(sql("SELECT name FROM carbon_globalsort WHERE id = 2"), Seq(Row("bb")))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
+      sql("SELECT * FROM carbon_localsort_update ORDER BY name"))
+  }
+
+  // ----------------------------------- INSERT INTO -----------------------------------
+  test("INSERT INTO") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "GLOBAL_SORT")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, "2")
+    sql(s"INSERT INTO TABLE carbon_globalsort SELECT * FROM carbon_localsort_once")
+
+    assert(getIndexFileCount("carbon_globalsort") === 2)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
+      sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
+  }
+
+  private def resetConf() {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
+
+    sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = ${CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT}")
+    sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = " +
+      s"${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT}")
+  }
+
+  private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
+    val store  = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+    val list = new File(store).list(new FilenameFilter {
+      override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+    })
+    list.size
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
new file mode 100644
index 0000000..04e0d22
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.util.Comparator
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Use sortBy operator in spark to load the data
+ */
+object DataLoadProcessBuilderOnSpark {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def loadDataUsingGlobalSort(
+      sc: SparkContext,
+      dataFrame: Option[DataFrame],
+      model: CarbonLoadModel,
+      currentLoadCount: Int): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    val originRDD = if (dataFrame.isDefined) {
+      dataFrame.get.rdd
+    } else {
+      // input data from files
+      val hadoopConfiguration = new Configuration()
+      CommonUtil.configureCSVInputFormat(hadoopConfiguration, model)
+      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+      val columnCount = model.getCsvHeaderColumns.length
+      new NewHadoopRDD[NullWritable, StringArrayWritable](
+        sc,
+        classOf[CSVInputFormat],
+        classOf[NullWritable],
+        classOf[StringArrayWritable],
+        hadoopConfiguration)
+        .map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
+    }
+
+    model.setPartitionId("0")
+    model.setSegmentId(currentLoadCount.toString)
+    val modelBroadcast = sc.broadcast(model)
+    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
+
+    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
+    val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator")
+    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
+    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
+
+    // 1. Input
+    val inputRDD = originRDD
+      .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
+      .mapPartitionsWithIndex { case (index, rows) =>
+        DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter)
+      }
+
+    // 2. Convert
+    val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
+      DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
+        convertStepRowCounter)
+    }.filter(_ != null)// Filter the bad record
+
+    // 3. Sort
+    val configuration = DataLoadProcessBuilder.createConfiguration(model)
+    val sortParameters = SortParameters.createSortParameters(configuration)
+    object RowOrdering extends Ordering[Array[AnyRef]] {
+      def compare(rowA: Array[AnyRef], rowB: Array[AnyRef]): Int = {
+        val rowComparator: Comparator[Array[AnyRef]] =
+          if (sortParameters.getNoDictionaryCount > 0) {
+            new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
+          } else {
+            new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
+          }
+
+        rowComparator.compare(rowA, rowB)
+      }
+    }
+
+    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(configuration)
+    if (numPartitions <= 0) {
+      numPartitions = convertRDD.partitions.length
+    }
+
+    // Because if the number of partitions greater than 1, there will be action operator(sample) in
+    // sortBy operator. So here we cache the rdd to avoid do input and convert again.
+    if (numPartitions > 1) {
+      convertRDD.persist(StorageLevel.MEMORY_AND_DISK)
+    }
+
+    import scala.reflect.classTag
+    val sortRDD = convertRDD
+      .sortBy(_.getData, numPartitions = numPartitions)(RowOrdering, classTag[Array[AnyRef]])
+      .mapPartitionsWithIndex { case (index, rows) =>
+        DataLoadProcessorStepOnSpark.convertTo3Parts(rows, index, modelBroadcast,
+          sortStepRowCounter)
+      }
+
+    // 4. Write
+    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+      DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
+        writeStepRowCounter))
+
+    // clean cache
+    convertRDD.unpersist()
+
+    // Log the number of rows in each step
+    LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
+    LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value)
+    LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value)
+    LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value)
+
+    // Update status
+    if (partialSuccessAccum.value != 0) {
+      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
+        "Partial_Success"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+      executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+    } else {
+      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
new file mode 100644
index 0000000..fb9223c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import scala.util.Random
+
+import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.csvload.StringArrayWritable
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl
+import org.apache.carbondata.processing.newflow.sort.SortStepRowUtil
+import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl}
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters
+import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+
+object DataLoadProcessorStepOnSpark {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = {
+    val outRow = new StringArrayRow(new Array[String](columnCount))
+    outRow.setValues(row.get())
+  }
+
+  def toRDDIterator(
+      rows: Iterator[Row],
+      modelBroadcast: Broadcast[CarbonLoadModel]): Iterator[Array[AnyRef]] = {
+    new Iterator[Array[AnyRef]] {
+      val iter = new NewRddIterator(rows, modelBroadcast.value, TaskContext.get())
+
+      override def hasNext: Boolean = iter.hasNext
+
+      override def next(): Array[AnyRef] = iter.next
+    }
+  }
+
+  def inputFunc(
+      rows: Iterator[Array[AnyRef]],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val rowParser = new RowParserImpl(conf.getDataFields, conf)
+
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        val row = new CarbonRow(rowParser.parseRow(rows.next()))
+        rowCounter.add(1)
+        row
+      }
+    }
+  }
+
+  def convertFunc(
+      rows: Iterator[CarbonRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      partialSuccessAccum: Accumulator[Int],
+      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(conf)
+    val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger)
+    rowConverter.initialize()
+
+    TaskContext.get().addTaskCompletionListener { context =>
+      DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter)
+      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum)
+    }
+
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
+      DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter)
+      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum)
+
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        val row = rowConverter.convert(rows.next())
+        rowCounter.add(1)
+        row
+      }
+    }
+  }
+
+  def convertTo3Parts(
+      rows: Iterator[CarbonRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val sortParameters = SortParameters.createSortParameters(conf)
+
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        val row = new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters))
+        rowCounter.add(1)
+        row
+      }
+    }
+  }
+
+  def writeFunc(
+      rows: Iterator[CarbonRow],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      rowCounter: Accumulator[Int]) {
+    var model: CarbonLoadModel = null
+    var tableName: String = null
+    var rowConverter: RowConverterImpl = null
+
+    try {
+      model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
+      val storeLocation = getTempStoreLocation(index)
+      val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation)
+
+      tableName = model.getTableName
+
+      // When we use sortBy, it means we have 2 stages. Stage1 can't get the finder from Stage2
+      // directly because they are in different processes. We need to set cardinality finder in
+      // Stage1 again.
+      rowConverter = new RowConverterImpl(conf.getDataFields, conf, null)
+      rowConverter.initialize()
+      conf.setCardinalityFinder(rowConverter)
+
+      val dataWriter = new DataWriterProcessorStepImpl(conf)
+
+      val dataHandlerModel = dataWriter.getDataHandlerModel(0)
+      var dataHandler: CarbonFactHandler = null
+      var rowsNotExist = true
+      while (rows.hasNext) {
+        if (rowsNotExist) {
+          rowsNotExist = false
+          dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
+            CarbonFactHandlerFactory.FactHandlerType.COLUMNAR)
+          dataHandler.initialise()
+        }
+        val row = dataWriter.processRow(rows.next(), dataHandler)
+        rowCounter.add(1)
+        row
+      }
+
+      if (!rowsNotExist) {
+        dataWriter.finish(dataHandler)
+      }
+    } catch {
+      case e: CarbonDataWriterException =>
+        LOGGER.error(e, "Failed for table: " + tableName + " in Data Writer Step")
+        throw new CarbonDataLoadingException("Error while initializing data handler : " +
+          e.getMessage)
+      case e: Exception =>
+        LOGGER.error(e, "Failed for table: " + tableName + " in Data Writer Step")
+        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage, e)
+    } finally {
+      if (rowConverter != null) {
+        rowConverter.finish()
+      }
+      // clean up the folders and files created locally for data load operation
+      CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+    }
+  }
+
+  private def getTempStoreLocation(index: Int): String = {
+    var storeLocation = ""
+    // this property is used to determine whether temp location for carbon is inside
+    // container temp dir or is yarn application directory.
+    val carbonUseLocalDir = CarbonProperties.getInstance()
+      .getProperty("carbon.use.local.dir", "false")
+    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+      if (null != storeLocations && storeLocations.nonEmpty) {
+        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      }
+      if (storeLocation == null) {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+    } else {
+      storeLocation = System.getProperty("java.io.tmpdir")
+    }
+    storeLocation = storeLocation + '/' + System.nanoTime() + '/' + index
+    storeLocation
+  }
+
+  private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
+    e match {
+      case e: CarbonDataLoadingException => throw e
+      case e: Exception =>
+        LOGGER.error(e, "Data Loading failed for table " + model.getTableName)
+        throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName,
+          e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
new file mode 100644
index 0000000..7880fee
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import org.apache.spark.Accumulator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger
+
+object GlobalSortHelper {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: Accumulator[Int]): Unit = {
+    val key = new CarbonTableIdentifier(loadModel.getDatabaseName, loadModel.getTableName, null)
+      .getBadRecordLoggerKey
+    if (null != BadRecordsLogger.hasBadRecord(key)) {
+      LOGGER.error("Data Load is partially success for table " + loadModel.getTableName)
+      badRecordsAccum.add(1)
+    } else {
+      LOGGER.info("Data loading is successful for table " + loadModel.getTableName)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
new file mode 100644
index 0000000..ae951bd
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+object ValidateUtil {
+  def validateDateFormat(dateFormat: String, table: CarbonTable, tableName: String): Unit = {
+    val dimensions = table.getDimensionByTableName(tableName).asScala
+    if (dateFormat != null) {
+      if (dateFormat.trim == "") {
+        throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
+          "string.")
+      } else {
+        val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
+        for (singleDateFormat <- dateFormats) {
+          val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
+          val columnName = dateFormatSplits(0).trim.toLowerCase
+          if (!dimensions.exists(_.getColName.equals(columnName))) {
+            throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
+              dateFormatSplits(0) +
+              " is provided in Option DateFormat.")
+          }
+          if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
+            throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
+              "for " + "Column " + dateFormatSplits(0) +
+              ".")
+          }
+        }
+      }
+    }
+  }
+
+  def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {
+    if (sortScope != null) {
+      // Don't support use global sort on partitioned table.
+      if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null &&
+        sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
+        throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " +
+          "table.")
+      }
+    }
+  }
+
+  def validateGlobalSortPartitions(globalSortPartitions: String): Unit = {
+    if (globalSortPartitions != null) {
+      try {
+        val num = globalSortPartitions.toInt
+        if (num <= 0) {
+          throw new MalformedCarbonCommandException("'GLOBAL_SORT_PARTITIONS' should be greater " +
+            "than 0.")
+        }
+      } catch {
+        case e: NumberFormatException => throw new MalformedCarbonCommandException(e.getMessage)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 7237f2f..db56c5b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -799,7 +799,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT",
-      "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB"
+      "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB",
+      "GLOBAL_SORT_PARTITIONS"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 71cc65e..580dcc6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -59,6 +59,10 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException}
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load.{FailureCauses, _}
 import org.apache.carbondata.spark.splits.TableSplit
@@ -765,11 +769,18 @@ object CarbonDataRDDFactory {
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       var errorMessage: String = "DataLoad failure"
       var executorMessage: String = ""
+      val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel)
+      val sortScope = CarbonDataProcessorUtil.getSortScope(configuration)
       try {
         if (updateModel.isDefined) {
           loadDataFrameForUpdate()
         } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
           loadDataForPartitionTable()
+        } else if (configuration.isSortTable &&
+            sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+          LOGGER.audit("Using global sort for loading.")
+          status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
+            dataFrame, carbonLoadModel, currentLoadCount)
         } else if (dataFrame.isDefined) {
           loadDataFrame()
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 279c645..426d340 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -53,6 +53,7 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
 
@@ -404,10 +405,13 @@ case class LoadTable(
       val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
       val dateFormat = options.getOrElse("dateformat", null)
-      validateDateFormat(dateFormat, table)
+      ValidateUtil.validateDateFormat(dateFormat, table, tableName)
       val maxColumns = options.getOrElse("maxcolumns", null)
       val sortScope = options.getOrElse("sort_scope", null)
+      ValidateUtil.validateSortScope(table, sortScope)
       val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
+      val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
+      ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
 
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
@@ -433,6 +437,7 @@ case class LoadTable(
           DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
       carbonLoadModel.setSortScope(sortScope)
       carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
+      carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
       // when single_pass=true, and not use all dict
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>
@@ -675,31 +680,6 @@ case class LoadTable(
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
   }
 
-  private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
-    val dimensions = table.getDimensionByTableName(tableName).asScala
-    if (dateFormat != null) {
-      if (dateFormat.trim == "") {
-        throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
-                                                  "string.")
-      } else {
-        val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
-        for (singleDateFormat <- dateFormats) {
-          val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
-          val columnName = dateFormatSplits(0).trim.toLowerCase
-          if (!dimensions.exists(_.getColName.equals(columnName))) {
-            throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
-                                                      dateFormatSplits(0) +
-                                                      " is provided in Option DateFormat.")
-          }
-          if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
-            throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
-                                                      "for " + "Column " + dateFormatSplits(0) +
-                                                      ".")
-          }
-        }
-      }
-    }
-  }
 }
 
 private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a173d5f..2e8e024 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -59,6 +59,10 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException}
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load.{FailureCauses, _}
 import org.apache.carbondata.spark.splits.TableSplit
@@ -777,11 +781,18 @@ object CarbonDataRDDFactory {
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       var errorMessage: String = "DataLoad failure"
       var executorMessage: String = ""
+      val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel)
+      val sortScope = CarbonDataProcessorUtil.getSortScope(configuration)
       try {
         if (updateModel.isDefined) {
           loadDataFrameForUpdate()
         } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
           loadDataForPartitionTable()
+        } else if (configuration.isSortTable &&
+            sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+          LOGGER.audit("Using global sort for loading.")
+          status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
+            dataFrame, carbonLoadModel, currentLoadCount)
         } else if (dataFrame.isDefined) {
           loadDataFrame()
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f594d51..e83d541 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -53,6 +53,7 @@ import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
 
@@ -416,10 +417,13 @@ case class LoadTable(
       val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
       val dateFormat = options.getOrElse("dateformat", null)
-      validateDateFormat(dateFormat, table)
+      ValidateUtil.validateDateFormat(dateFormat, table, tableName)
       val maxColumns = options.getOrElse("maxcolumns", null)
       val sortScope = options.getOrElse("sort_scope", null)
+      ValidateUtil.validateSortScope(table, sortScope)
       val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
+      val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
+      ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
@@ -444,6 +448,7 @@ case class LoadTable(
           DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
       carbonLoadModel.setSortScope(sortScope)
       carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
+      carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>
           true
@@ -646,32 +651,6 @@ case class LoadTable(
     }
     Seq.empty
   }
-
-  private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
-    val dimensions = table.getDimensionByTableName(tableName).asScala
-    if (dateFormat != null) {
-      if (dateFormat.trim == "") {
-        throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
-                                                  "string.")
-      } else {
-        val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
-        for (singleDateFormat <- dateFormats) {
-          val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
-          val columnName = dateFormatSplits(0).trim.toLowerCase
-          if (!dimensions.exists(_.getColName.equals(columnName))) {
-            throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
-                                                      dateFormatSplits(0) +
-                                                      " is provided in Option DateFormat.")
-          }
-          if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
-            throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
-                                                      "for " + "Column " + dateFormatSplits(0) +
-                                                      ".")
-          }
-        }
-      }
-    }
-  }
 }
 
 private[sql] case class DeleteLoadByDate(
@@ -695,7 +674,6 @@ private[sql] case class DeleteLoadByDate(
     )
     Seq.empty
   }
-
 }
 
 case class CleanFiles(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 86163b4..7ec7933 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -160,6 +160,11 @@ public class CarbonLoadModel implements Serializable {
   private String batchSortSizeInMb;
 
   /**
+   * Number of partitions in global sort.
+   */
+  private String globalSortPartitions;
+
+  /**
    * get escape char
    *
    * @return
@@ -362,6 +367,55 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
+   * Get copy with taskNo.
+   * Broadcast value is shared in process, so we need to copy it to make sure the value in each
+   * task independently.
+   *
+   * @return
+   */
+  public CarbonLoadModel getCopyWithTaskNo(String taskNo) {
+    CarbonLoadModel copy = new CarbonLoadModel();
+    copy.tableName = tableName;
+    copy.factFilePath = factFilePath;
+    copy.databaseName = databaseName;
+    copy.partitionId = partitionId;
+    copy.aggTables = aggTables;
+    copy.aggTableName = aggTableName;
+    copy.aggLoadRequest = aggLoadRequest;
+    copy.loadMetadataDetails = loadMetadataDetails;
+    copy.isRetentionRequest = isRetentionRequest;
+    copy.csvHeader = csvHeader;
+    copy.csvHeaderColumns = csvHeaderColumns;
+    copy.isDirectLoad = isDirectLoad;
+    copy.csvDelimiter = csvDelimiter;
+    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
+    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copy.carbonDataLoadSchema = carbonDataLoadSchema;
+    copy.blocksID = blocksID;
+    copy.taskNo = taskNo;
+    copy.factTimeStamp = factTimeStamp;
+    copy.segmentId = segmentId;
+    copy.serializationNullFormat = serializationNullFormat;
+    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copy.badRecordsAction = badRecordsAction;
+    copy.escapeChar = escapeChar;
+    copy.quoteChar = quoteChar;
+    copy.commentChar = commentChar;
+    copy.dateFormat = dateFormat;
+    copy.defaultTimestampFormat = defaultTimestampFormat;
+    copy.maxColumns = maxColumns;
+    copy.storePath = storePath;
+    copy.useOnePass = useOnePass;
+    copy.dictionaryServerHost = dictionaryServerHost;
+    copy.dictionaryServerPort = dictionaryServerPort;
+    copy.preFetch = preFetch;
+    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
+    copy.sortScope = sortScope;
+    copy.batchSortSizeInMb = batchSortSizeInMb;
+    return copy;
+  }
+
+  /**
    * get CarbonLoadModel with partition
    *
    * @param uniqueId
@@ -702,4 +756,12 @@ public class CarbonLoadModel implements Serializable {
   public void setBatchSortSizeInMb(String batchSortSizeInMb) {
     this.batchSortSizeInMb = batchSortSizeInMb;
   }
+
+  public String getGlobalSortPartitions() {
+    return globalSortPartitions;
+  }
+
+  public void setGlobalSortPartitions(String globalSortPartitions) {
+    this.globalSortPartitions = globalSortPartitions;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index d6a90b1..3294d5f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -132,12 +132,12 @@ public final class DataLoadProcessBuilder {
     return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
   }
 
-  private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
-      String storeLocation) throws Exception {
+  public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
+      String storeLocation) {
     if (!new File(storeLocation).mkdirs()) {
       LOGGER.error("Error while creating the temp store path: " + storeLocation);
     }
-    CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
+
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
     String tempLocationKey = CarbonDataProcessorUtil
@@ -146,6 +146,11 @@ public final class DataLoadProcessBuilder {
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
 
+    return createConfiguration(loadModel);
+  }
+
+  public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel) {
+    CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
     configuration.setTableIdentifier(identifier);
@@ -173,6 +178,8 @@ public final class DataLoadProcessBuilder {
         .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
     configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
         loadModel.getBatchSortSizeInMb());
+    configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        loadModel.getGlobalSortPartitions());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
index f2534db..1cc043f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
@@ -33,6 +33,8 @@ public class SortScopeOptions {
         return SortScope.BATCH_SORT;
       case "LOCAL_SORT":
         return SortScope.LOCAL_SORT;
+      case "GLOBAL_SORT":
+        return SortScope.GLOBAL_SORT;
       case "NO_SORT":
         return SortScope.NO_SORT;
       default:
@@ -49,6 +51,8 @@ public class SortScopeOptions {
         return true;
       case "LOCAL_SORT":
         return true;
+      case "GLOBAL_SORT":
+        return true;
       case "NO_SORT":
         return true;
       default:
@@ -57,7 +61,7 @@ public class SortScopeOptions {
   }
 
   public enum SortScope {
-    NO_SORT, BATCH_SORT, LOCAL_SORT;
+    NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
new file mode 100644
index 0000000..474df4d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort;
+
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
+
+public class SortStepRowUtil {
+  public static Object[] convertRow(Object[] data, SortParameters parameters) {
+    int measureCount = parameters.getMeasureColCount();
+    int dimensionCount = parameters.getDimColCount();
+    int complexDimensionCount = parameters.getComplexDimColCount();
+    int noDictionaryCount = parameters.getNoDictionaryCount();
+    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+    Object[] holder = new Object[3];
+    int index = 0;
+    int nonDicIndex = 0;
+    int allCount = 0;
+    int[] dim = new int[dimensionCount];
+    byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
+    Object[] measures = new Object[measureCount];
+    try {
+      // read dimension values
+      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+        if (isNoDictionaryDimensionColumn[i]) {
+          nonDicArray[nonDicIndex++] = (byte[]) data[i];
+        } else {
+          dim[index++] = (int) data[allCount];
+        }
+        allCount++;
+      }
+
+      for (int i = 0; i < complexDimensionCount; i++) {
+        nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
+        allCount++;
+      }
+
+      index = 0;
+      // read measure values
+      for (int i = 0; i < measureCount; i++) {
+        measures[index++] = data[allCount];
+        allCount++;
+      }
+
+      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+      // increment number if record read
+    } catch (Exception e) {
+      throw new RuntimeException("Problem while converting row ", e);
+    }
+
+    //return out row
+    return holder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 7522bbd..7b8ec79 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -27,13 +27,13 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.newflow.sort.SortStepRowUtil;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeFinalMergePageHolder;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   /**
@@ -202,7 +202,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    * @return sorted row
    */
   public Object[] next() {
-    return convertRow(getSortedRecordFromFile());
+    return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
   }
 
   /**
@@ -259,50 +259,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
     return this.fileCounter > 0;
   }
 
-  private Object[] convertRow(Object[] data) {
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
-    Object[] holder = new Object[3];
-    int index = 0;
-    int nonDicIndex = 0;
-    int allCount = 0;
-    int[] dim = new int[this.dimensionCount];
-    byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
-    Object[] measures = new Object[this.measureCount];
-    try {
-      // read dimension values
-      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
-        if (isNoDictionaryDimensionColumn[i]) {
-          nonDicArray[nonDicIndex++] = (byte[]) data[i];
-        } else {
-          dim[index++] = (int) data[allCount];
-        }
-        allCount++;
-      }
-
-      for (int i = 0; i < complexDimensionCount; i++) {
-        nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
-        allCount++;
-      }
-
-      index = 0;
-      // read measure values
-      for (int i = 0; i < this.measureCount; i++) {
-        measures[index++] = data[allCount];
-        allCount++;
-      }
-
-      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
-      // increment number if record read
-    } catch (Exception e) {
-      throw new RuntimeException("Problem while converting row ", e);
-    }
-
-    //return out row
-    return holder;
-  }
-
   public void clear() {
     if (null != recordHolderHeapLocal) {
       for (SortTempChunkHolder pageHolder : recordHolderHeapLocal) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 1ea0797..000d0b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -62,7 +62,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   public void initialize() throws IOException {
     child.initialize();
     converters = new ArrayList<>();
-    badRecordLogger = createBadRecordLogger();
+    badRecordLogger = createBadRecordLogger(configuration);
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
@@ -115,7 +115,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     throw new UnsupportedOperationException();
   }
 
-  private BadRecordsLogger createBadRecordLogger() {
+  public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) {
     boolean badRecordsLogRedirect = false;
     boolean badRecordConvertNullDisable = false;
     boolean isDataLoadFail = false;
@@ -159,7 +159,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
         badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
   }
 
-  private String getBadLogStoreLocation(String storeLocation) {
+  public static String getBadLogStoreLocation(String storeLocation) {
     String badLogStoreLocation =
         CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
@@ -172,13 +172,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     if (!closed) {
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
-        // rename the bad record in progress to normal
-        CarbonTableIdentifier identifier =
-            configuration.getTableIdentifier().getCarbonTableIdentifier();
-        CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
-            identifier.getDatabaseName() + File.separator + identifier.getTableName()
-                + File.separator + configuration.getSegmentId() + File.separator + configuration
-                .getTaskNo());
+        renameBadRecord(configuration);
       }
       super.close();
       if (converters != null) {
@@ -189,6 +183,27 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     }
   }
 
+  public static void close(BadRecordsLogger badRecordLogger, CarbonDataLoadConfiguration
+      configuration, RowConverter converter) {
+    if (badRecordLogger != null) {
+      badRecordLogger.closeStreams();
+      renameBadRecord(configuration);
+    }
+    if (converter != null) {
+      converter.finish();
+    }
+  }
+
+  private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
+    // rename the bad record in progress to normal
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
+        identifier.getDatabaseName() + File.separator + identifier.getTableName()
+            + File.separator + configuration.getSegmentId() + File.separator + configuration
+            .getTaskNo());
+  }
+
   @Override protected String getStepName() {
     return "Data Converter";
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 9c97eaa..087b0c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -52,6 +53,10 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     super(configuration, child);
   }
 
+  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
+    super(configuration, null);
+  }
+
   @Override public DataField[] getOutput() {
     return child.getOutput();
   }
@@ -69,6 +74,15 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return storeLocation;
   }
 
+  public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+        .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0);
+    return model;
+  }
+
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
     Iterator<CarbonRowBatch>[] iterators = child.execute();
     CarbonTableIdentifier tableIdentifier =
@@ -95,7 +109,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
           processBatch(iterator.next(), dataHandler);
         }
         if (!rowsNotExist) {
-          finish(tableName, dataHandler);
+          finish(dataHandler);
         }
         i++;
       }
@@ -115,7 +129,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return "Data Writer";
   }
 
-  private void finish(String tableName, CarbonFactHandler dataHandler) {
+  public void finish(CarbonFactHandler dataHandler) {
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
+
     try {
       dataHandler.finish();
     } catch (Exception e) {
@@ -163,6 +181,16 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     rowCounter.getAndAdd(batch.getSize());
   }
 
+  public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
+    try {
+      readCounter++;
+      dataHandler.addDataToStore(row);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+    }
+    rowCounter.getAndAdd(1);
+  }
+
   @Override protected CarbonRow processRow(CarbonRow row) {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
index 105e9d4..31c2b4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
@@ -17,7 +17,9 @@
 
 package org.apache.carbondata.processing.schema.metadata;
 
-public class SortObserver {
+import java.io.Serializable;
+
+public class SortObserver implements Serializable {
   /**
    * is failed
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index e4b8a46..7a7142b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.processing.sortandgroupby.sortdata;
 
 import java.io.File;
+import java.io.Serializable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -29,7 +30,7 @@ import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.schema.metadata.SortObserver;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-public class SortParameters {
+public class SortParameters implements Serializable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(SortParameters.class.getName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/82741c1f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 53d5dcd..91c2bc4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.NodeHolder;
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -145,6 +146,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   private DefaultEncoder encoder;
 
+  private SortScopeOptions.SortScope sortScope;
+
   /**
    * CarbonFactDataHandler constructor
    */
@@ -207,7 +210,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
-
+    this.sortScope = model.getSortScope();
     this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
 
     //TODO need to pass carbon table identifier to metadata
@@ -243,6 +246,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
     }
 
+    if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+      numberOfCores = 1;
+    }
+
     blockletProcessingCount = new AtomicInteger(0);
     producerExecutorService = Executors.newFixedThreadPool(numberOfCores);
     producerExecutorServiceTaskList =
@@ -791,4 +798,4 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       return null;
     }
   }
-}
\ No newline at end of file
+}


Mime
View raw message