carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/2] carbondata git commit: [REBASE] Solve conflict after merging master
Date Fri, 02 Mar 2018 09:48:49 GMT
[REBASE] Solve conflict after merging master


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c305f309
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c305f309
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c305f309

Branch: refs/heads/carbonstore-rebase5
Commit: c305f309e4556bfc98f01e0dff41b8411c992a12
Parents: 8a9dd8b
Author: Jacky Li <jacky.likun@qq.com>
Authored: Tue Feb 27 11:26:30 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Mar 2 17:48:31 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/dev/DataMap.java    |   9 +-
 .../core/datamap/dev/DataMapFactory.java        |   2 +-
 .../exception/ConcurrentOperationException.java |  16 +-
 .../core/indexstore/BlockletDetailsFetcher.java |   3 +-
 .../blockletindex/BlockletDataMap.java          |   3 +-
 .../blockletindex/SegmentIndexFileStore.java    |   2 -
 .../core/metadata/PartitionMapFileStore.java    |   0
 .../scan/executor/util/RestructureUtil.java     |   6 +-
 .../statusmanager/SegmentStatusManager.java     |  10 +-
 .../SegmentUpdateStatusManager.java             |   7 +-
 datamap/examples/pom.xml                        | 145 +++++++----------
 .../datamap/examples/MinMaxDataMap.java         |   3 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |  26 +--
 .../datamap/examples/MinMaxDataWriter.java      |   8 +-
 .../CarbonStreamSparkStreamingExample.scala     |  14 +-
 ...CarbonStructuredStreamingWithRowParser.scala |   8 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   5 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |   2 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   2 +-
 .../StandardPartitionGlobalSortTestCase.scala   |   2 +-
 .../exception/ProcessMetaDataException.java     |   2 +
 .../org/apache/carbondata/api/CarbonStore.scala |   6 +-
 .../carbondata/spark/load/CsvRDDHelper.scala    | 157 +++++++++++++++++++
 .../load/DataLoadProcessBuilderOnSpark.scala    |   3 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |   2 -
 .../command/carbonTableSchemaCommon.scala       |   6 +-
 .../CarbonAlterTableCompactionCommand.scala     |   3 +-
 .../management/CarbonCleanFilesCommand.scala    |   2 +-
 .../CarbonDeleteLoadByIdCommand.scala           |   2 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   2 +-
 .../management/CarbonLoadDataCommand.scala      |  28 ++--
 .../CarbonProjectForDeleteCommand.scala         |   2 +-
 .../CarbonProjectForUpdateCommand.scala         |   2 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   2 +-
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../datasources/CarbonFileFormat.scala          |   3 -
 .../TestStreamingTableWithRowParser.scala       |   9 +-
 .../vectorreader/AddColumnTestCases.scala       |   1 +
 .../datamap/DataMapWriterListener.java          |   3 +-
 .../loading/model/CarbonLoadModelBuilder.java   |  34 +++-
 .../processing/loading/model/LoadOption.java    |  15 +-
 .../processing/merger/CarbonDataMergerUtil.java |  19 ++-
 .../merger/CompactionResultSortProcessor.java   |   4 +-
 .../merger/RowResultMergerProcessor.java        |   4 +-
 .../partition/spliter/RowResultProcessor.java   |   4 +-
 .../util/CarbonDataProcessorUtil.java           |   3 +-
 store/sdk/pom.xml                               |   2 +-
 .../carbondata/sdk/file/CSVCarbonWriter.java    |   8 +-
 49 files changed, 384 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 4a68286..fdeacff 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -38,18 +38,13 @@ public interface DataMap<T extends Blocklet> {
   /**
    * Prune the datamap with filter expression and partition information. It returns the list of
    * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions);
+  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      List<PartitionSpec> partitions);
 
   // TODO Move this method to Abstract class
   /**
    * Validate whether the current segment needs to be fetching the required data
-   *
-   * @param filterExp
-   * @return
    */
   boolean isScanRequired(FilterResolverIntf filterExp);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 50ac279..d8a467f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,8 +21,8 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
index 7e717ba..918268c 100644
--- a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
+++ b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
@@ -17,21 +17,10 @@
 
 package org.apache.carbondata.core.exception;
 
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
-/**
- * This exception will be thrown when executing concurrent operations which
- * is not supported in carbon.
- *
- * For example, when INSERT OVERWRITE is executing, other operations are not
- * allowed, so this exception will be thrown
- */
-@InterfaceAudience.User
-@InterfaceStability.Stable
-public class ConcurrentOperationException extends Exception {
+public class ConcurrentOperationException extends MalformedCarbonCommandException {
 
   public ConcurrentOperationException(String dbName, String tableName, String command1,
       String command2) {
@@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception {
   }
 
 }
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index dd592c0..58c11db 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -53,5 +53,6 @@ public interface BlockletDetailsFetcher {
    * @param segment
    * @return
    */
-  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException;
+  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index ce6193b..b379ae3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -660,7 +660,8 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache
   }
 
   @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      List<PartitionSpec> partitions) {
     if (unsafeMemoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 537e124..00d03a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 2712cbc..e67d822 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -80,7 +80,7 @@ public class RestructureUtil {
       if (queryDimension.getDimension().hasEncoding(Encoding.IMPLICIT)) {
         presentDimension.add(queryDimension);
         isDimensionExists[dimIndex] = true;
-        dimensionInfo.dataType[queryDimension.getQueryOrder()] =
+        dimensionInfo.dataType[queryDimension.getOrdinal()] =
             queryDimension.getDimension().getDataType();
       } else {
         for (CarbonDimension tableDimension : tableBlockDimensions) {
@@ -95,7 +95,7 @@ public class RestructureUtil {
             currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
             presentDimension.add(currentBlockDimension);
             isDimensionExists[dimIndex] = true;
-            dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] =
+            dimensionInfo.dataType[currentBlockDimension.getOrdinal()] =
                 currentBlockDimension.getDimension().getDataType();
             break;
           }
@@ -113,7 +113,7 @@ public class RestructureUtil {
             currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
             presentDimension.add(currentBlockDimension);
             isDimensionExists[dimIndex] = true;
-            dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] =
+            dimensionInfo.dataType[currentBlockDimension.getOrdinal()] =
                 currentBlockDimension.getDimension().getDataType();
             break;
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 89666ab..1bb4b03 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
@@ -838,6 +839,13 @@ public class SegmentStatusManager {
   public static void deleteLoadsAndUpdateMetadata(
       CarbonTable carbonTable,
       boolean isForceDeletion) throws IOException {
+    deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+  }
+
+  public static void deleteLoadsAndUpdateMetadata(
+      CarbonTable carbonTable,
+      boolean isForceDeletion,
+      List<PartitionSpec> partitionSpecs) throws IOException {
     if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
       LoadMetadataDetails[] details =
           SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
@@ -882,7 +890,7 @@ public class SegmentStatusManager {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
           if (updationCompletionStatus) {
             DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
-                identifier, carbonTable.getMetadataPath(), isForceDeletion);
+                identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 6ec6fa2..4a2149e 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -77,8 +77,8 @@ public class SegmentUpdateStatusManager {
     this.identifier = identifier;
     // current it is used only for read function scenarios, as file update always requires to work
     // on latest file status.
-    segmentDetails =
-        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(identifier.getTablePath()));
+    segmentDetails = SegmentStatusManager.readLoadMetadata(
+        CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     if (segmentDetails.length > 0) {
       isPartitionTable = segmentDetails[0].getSegmentFile() != null;
     }
@@ -259,7 +259,8 @@ public class SegmentUpdateStatusManager {
             + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
             .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
       } else {
-        String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
+        String carbonDataDirectoryPath =
+            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
         blockPath =
             carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/datamap/examples/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml
index 6832e62..30e1522 100644
--- a/datamap/examples/pom.xml
+++ b/datamap/examples/pom.xml
@@ -15,97 +15,70 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
-    <modelVersion>4.0.0</modelVersion>
+  <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-        <groupId>org.apache.carbondata</groupId>
-        <artifactId>carbondata-parent</artifactId>
-        <version>1.3.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
 
-    <artifactId>carbondata-datamap-examples</artifactId>
-    <name>Apache CarbonData :: Datamap Examples</name>
+  <artifactId>carbondata-datamap-examples</artifactId>
+  <name>Apache CarbonData :: DataMap Examples</name>
 
-    <properties>
-        <dev.path>${basedir}/../../dev</dev.path>
-    </properties>
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.carbondata</groupId>
-            <artifactId>carbondata-spark2</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-hive-thriftserver_2.10</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-repl_2.10</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.spark</groupId>
-                    <artifactId>spark-sql_2.10</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.binary.version}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-repl_${scala.binary.version}</artifactId>
-        </dependency>
-    </dependencies>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
 
-    <build>
-        <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
-        <resources>
-            <resource>
-                <directory>.</directory>
-                <includes>
-                    <include>CARBON_EXAMPLESLogResource.properties</include>
-                </includes>
-            </resource>
-        </resources>
-        <plugins>
-            <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <version>2.15.2</version>
-                <executions>
-                    <execution>
-                        <id>compile</id>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                        <phase>compile</phase>
-                    </execution>
-                    <execution>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
+  <build>
+    <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
+    <resources>
+      <resource>
+        <directory>.</directory>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
index 8002e57..75dac9e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
@@ -114,7 +115,7 @@ public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
    */
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp,
-      SegmentProperties segmentProperties, List<String> partitions) {
+      SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
     List<Blocklet> blocklets = new ArrayList<>();
 
     if (filterExp == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index 5203cb3..d81add8 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.datamap.examples;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -25,6 +24,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataM
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
 /**
@@ -48,28 +49,29 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   /**
    * createWriter will return the MinMaxDataWriter.
    *
-   * @param segmentId
+   * @param segment
    * @return
    */
-  @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) {
-    return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
+  @Override public AbstractDataMapWriter createWriter(Segment segment, String dataWritePath) {
+    return new MinMaxDataWriter(identifier, segment, dataWritePath);
   }
 
   /**
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    *
-   * @param segmentId
+   * @param segment
    * @return
    * @throws IOException
    */
-  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId)
+  @Override public List<AbstractCoarseGrainDataMap> getDataMaps(Segment segment)
       throws IOException {
     List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxDataMap.
     MinMaxDataMap dataMap = new MinMaxDataMap();
     try {
-      dataMap.init(new DataMapModel(
-          identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator));
+      dataMap.init(
+          new DataMapModel(
+              CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())));
     } catch (MemoryException ex) {
 
     }
@@ -78,19 +80,19 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   }
 
   /**
-   * @param segmentId
+   * @param segment
    * @return
    */
-  @Override public List<DataMapDistributable> toDistributable(String segmentId) {
+  @Override public List<DataMapDistributable> toDistributable(Segment segment) {
     return null;
   }
 
   /**
    * Clear the DataMap.
    *
-   * @param segmentId
+   * @param segment
    */
-  @Override public void clear(String segmentId) {
+  @Override public void clear(Segment segment) {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index fe0bbcf..17c8332 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -35,7 +36,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
 
@@ -52,11 +52,11 @@ public class MinMaxDataWriter extends AbstractDataMapWriter {
 
   private String dataWritePath;
 
-  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, Segment segment,
       String dataWritePath) {
-    super(identifier, segmentId, dataWritePath);
+    super(identifier, segment, dataWritePath);
     this.identifier = identifier;
-    this.segmentId = segmentId;
+    this.segmentId = segment.getSegmentNo();
     this.dataWritePath = dataWritePath;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
index f59a610..d3784e7 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -20,18 +20,14 @@ package org.apache.carbondata.examples
 import java.io.{File, PrintWriter}
 import java.net.ServerSocket
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.CarbonSparkStreamingListener
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
@@ -77,7 +73,7 @@ object CarbonStreamSparkStreamingExample {
            | 'dictionary_include'='city')
            | """.stripMargin)
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val tablePath = carbonTable.getTablePath
       // batch load
       val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -91,7 +87,7 @@ object CarbonStreamSparkStreamingExample {
       val serverSocket = new ServerSocket(7071)
       val thread1 = writeSocket(serverSocket)
       val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
       // add a Spark Streaming Listener to remove all lock for stream tables when stop app
       ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
       // wait for stop signal to stop Spark Streaming App
@@ -155,8 +151,8 @@ object CarbonStreamSparkStreamingExample {
     thread
   }
 
-  def startStreaming(spark: SparkSession, tableName: String,
-      tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+  def startStreaming(
+      spark: SparkSession, tableName: String, checkpointPath: String): StreamingContext = {
     var ssc: StreamingContext = null
     try {
       // recommend: the batch interval must set larger, such as 30s, 1min.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
index cce833b..61c45d3 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -23,7 +23,7 @@ import java.net.ServerSocket
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Int)
@@ -77,7 +77,7 @@ object CarbonStructuredStreamingWithRowParser {
       }
 
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val tablePath = carbonTable.getTablePath
       // batch load
       val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -140,7 +140,7 @@ object CarbonStructuredStreamingWithRowParser {
     thread
   }
 
-  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
+  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
     val thread = new Thread() {
       override def run(): Unit = {
         var qry: StreamingQuery = null
@@ -167,7 +167,7 @@ object CarbonStructuredStreamingWithRowParser {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("dbName", "default")
             .option("tableName", "stream_table_with_row_parser")
             .option(CarbonStreamParser.CARBON_STREAM_PARSER,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b485b69..5184e07 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,8 +33,8 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -494,7 +494,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
-        String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+        String segmentDir = CarbonTablePath.getSegmentPath(
+            identifier.getTablePath(), segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 8f63af6..e09b922 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index b39c44c..3f0ca42 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
 
 // This testsuite test insert and insert overwrite with other commands concurrently

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 7d0959c..629417f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll {
   var executorService: ExecutorService = _

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
index 3e06bde..471b645 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.exception;
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+
 // This exception will be thrown when processMetaData failed in
 // Carbon's RunnableCommand
 public class ProcessMetaDataException extends MalformedCarbonCommandException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b69ec37..bfb1616 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -139,9 +138,8 @@ object CarbonStore {
         carbonCleanFilesLock =
           CarbonLockUtil
             .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
         currentTablePartitions match {
           case Some(partitions) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
new file mode 100644
index 0000000..36d8c51
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
+import org.apache.carbondata.spark.util.CommonUtil
+
+object CsvRDDHelper {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * createsw a RDD that does reading of multiple CSV files
+   */
+  def csvFileScanRDD(
+      spark: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration
+  ): RDD[InternalRow] = {
+    // 1. partition
+    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+    val defaultParallelism = spark.sparkContext.defaultParallelism
+    CommonUtil.configureCSVInputFormat(hadoopConf, model)
+    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val jobContext = new JobContextImpl(jobConf, null)
+    val inputFormat = new CSVInputFormat()
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val splitFiles = rawSplits.map { split =>
+      val fileSplit = split.asInstanceOf[FileSplit]
+      PartitionedFile(
+        InternalRow.empty,
+        fileSplit.getPath.toString,
+        fileSplit.getStart,
+        fileSplit.getLength,
+        fileSplit.getLocations)
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+                s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq)
+        partitions += newPartition
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
+    }
+    closePartition()
+
+    // 2. read function
+    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+        new Iterator[InternalRow] {
+          val hadoopConf = serializableConfiguration.value
+          val jobTrackerId: String = {
+            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+            formatter.format(new Date())
+          }
+          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+          val inputSplit =
+            new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+          var finished = false
+          val inputFormat = new CSVInputFormat()
+          val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+          reader.initialize(inputSplit, hadoopAttemptContext)
+
+          override def hasNext: Boolean = {
+            if (!finished) {
+              if (reader != null) {
+                if (reader.nextKeyValue()) {
+                  true
+                } else {
+                  finished = true
+                  reader.close()
+                  false
+                }
+              } else {
+                finished = true
+                false
+              }
+            } else {
+              false
+            }
+          }
+
+          override def next(): InternalRow = {
+            new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+          }
+        }
+      }
+    }
+    new FileScanRDD(spark, readFunction, partitions)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index e1bd84b..1062cd7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -34,7 +34,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark {
     } else {
       // input data from files
       val columnCount = model.getCsvHeaderColumns.length
-      DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
+      CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 33263d6..298c84e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
 import org.apache.spark.sql.types._
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogService
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -45,7 +46,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 9104a32..d3093fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -816,8 +816,6 @@ object CommonUtil {
                   val carbonTable = CarbonMetadata.getInstance
                     .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
                   SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
-                  DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-                    isForceDeletion = true, carbonTable, null)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 71ce2c6..3c21af3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
-import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e47c500..2f4aa30 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
@@ -47,7 +47,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
 import org.apache.carbondata.streaming.segment.StreamSegment
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index d2adc57..2092028 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 0861c63..81427a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByIdCommand(
     loadIds: Seq[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dcbc6ce..1d76bda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByLoadDateCommand(
     databaseNameOp: Option[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 4806a6f..34a464a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
@@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
-import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
+import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -193,12 +189,18 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+
+      val javaPartition = mutable.Map[String, String]()
+      partition.foreach { case (k, v) =>
+        if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+      }
+
       new CarbonLoadModelBuilder(table).build(
         options.asJava,
         optionsFinal,
         carbonLoadModel,
         hadoopConf,
-        partition,
+        javaPartition.asJava,
         dataFrame.isDefined)
       // Delete stale segment folders that are not in table status but are physically present in
       // the Fact folder
@@ -231,11 +233,7 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry for new load.
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = false,
-          table,
-          currPartitions)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -672,7 +670,7 @@ case class CarbonLoadDataCommand(
           }
         }
         val columnCount = carbonLoadModel.getCsvHeaderColumns.length
-        val rdd = DataLoadingUtil.csvFileScanRDD(
+        val rdd = CsvRDDHelper.csvFileScanRDD(
           sparkSession,
           model = carbonLoadModel,
           hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f074285..230378b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 5165342..2a92478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.storage.StorageLevel
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 5f8eb12..474f9c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -23,12 +23,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8001a93..0298eea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException}
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 61a31a5..2eed988 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
 
 class CarbonFileFormat
   extends FileFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 3e3b2c5..064ff28 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Integer)
@@ -735,7 +735,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       port: Int,
-      tablePath: CarbonTablePath,
+      tablePath: String,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
@@ -776,7 +776,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -817,7 +817,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier(tableName, Option("streaming1"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket()
@@ -830,7 +829,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
       val thread2 = createSocketStreamingThread(
         spark = spark,
         port = server.getLocalPort,
-        tablePath = tablePath,
+        tablePath = carbonTable.getTablePath,
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 995f041..d94570a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 1104229..66f8bc5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,8 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<AbstractDataMapWriter> writers = registry.get(columns);
-    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
+    AbstractDataMapWriter writer = factory.createWriter(
+        new Segment(segmentId, null), dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 17e8dbe..29dfa40 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder {
       Map<String, String> optionsFinal,
       CarbonLoadModel carbonLoadModel,
       Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+    build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false);
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+   *                   user provided load options
+   * @param partitions partition name map to path
+   * @param isDataFrame true if build for load for dataframe
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      Map<String, String> partitions,
+      boolean isDataFrame) throws InvalidLoadOptionException, IOException {
     carbonLoadModel.setTableName(table.getTableName());
     carbonLoadModel.setDatabaseName(table.getDatabaseName());
     carbonLoadModel.setTablePath(table.getTablePath());
@@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
     carbonLoadModel.setCsvHeader(fileHeader);
     carbonLoadModel.setColDictFilePath(column_dict);
+
+    List<String> ignoreColumns = new ArrayList<>();
+    if (!isDataFrame) {
+      for (Map.Entry<String, String> partition : partitions.entrySet()) {
+        if (partition.getValue() != null) {
+          ignoreColumns.add(partition.getKey());
+        }
+      }
+    }
+
     carbonLoadModel.setCsvHeaderColumns(
-        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns));
 
     int validatedMaxColumns = validateMaxColumns(
         carbonLoadModel.getCsvHeaderColumns(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c305f309/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 5af4859..bac1a94 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.Maps;
@@ -201,6 +203,16 @@ public class LoadOption {
   public static String[] getCsvHeaderColumns(
       CarbonLoadModel carbonLoadModel,
       Configuration hadoopConf) throws IOException {
+    return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>());
+  }
+
+  /**
+   * Return CSV header field names, with partition column
+   */
+  public static String[] getCsvHeaderColumns(
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      List<String> staticPartitionCols) throws IOException {
     String delimiter;
     if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
       delimiter = CarbonCommonConstants.COMMA;
@@ -231,7 +243,7 @@ public class LoadOption {
     }
 
     if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema())) {
+        carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {
         LOG.error("CSV header in DDL is not proper."
             + " Column names in schema and CSV header are not the same.");
@@ -249,4 +261,5 @@ public class LoadOption {
     }
     return csvColumns;
   }
+
 }


Mime
View raw message