carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1322] Insert overwrite support and force clean up files and clean up in progress files support added
Date Thu, 20 Jul 2017 12:42:09 GMT
Repository: carbondata
Updated Branches:
  refs/heads/metadata c07996ee8 -> 67b59634f


[CARBONDATA-1322] Insert overwrite support and force clean up files and clean up in progress files support added

1. Added support for LOAD OVERWRITE and INSERT OVERWRITE in carbon load
2. Added support for force clean table to remove the table with force from disk
3. Cleanup the inprogress files while driver is initializing

This closes #1189


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/67b59634
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/67b59634
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/67b59634

Branch: refs/heads/metadata
Commit: 67b59634fd6cd1acd6fc40dafb75008f7e879be2
Parents: c07996e
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Thu Jul 20 14:57:21 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Jul 20 20:41:14 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  8 ++
 .../core/datastore/impl/FileFactory.java        | 14 ++++
 .../InsertIntoCarbonTableTestCase.scala         | 44 ++++++++++
 .../carbondata/spark/load/CarbonLoaderUtil.java | 18 +++-
 .../org/apache/carbondata/api/CarbonStore.scala |  4 +-
 .../spark/rdd/DataManagementFunc.scala          | 19 +++--
 .../carbondata/spark/util/CommonUtil.scala      | 86 +++++++++++++++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  3 +-
 .../execution/command/carbonTableSchema.scala   | 21 +++--
 .../spark/sql/hive/CarbonStrategies.scala       |  5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  3 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |  2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  2 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  4 +-
 .../sql/execution/command/DDLStrategy.scala     |  4 +-
 .../execution/command/carbonTableSchema.scala   | 49 +++++++----
 .../org/apache/spark/util/CleanFiles.scala      | 19 ++++-
 .../org/apache/spark/util/TableLoader.scala     |  2 +-
 .../bucketing/TableBucketingTestCase.scala      | 12 +--
 .../vectorreader/VectorReaderTestCase.scala     |  2 +-
 .../apache/spark/util/CarbonCommandSuite.scala  | 12 +++
 21 files changed, 276 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f6e5c62..b0f9d32 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1264,6 +1264,14 @@ public final class CarbonCommonConstants {
 
   public static final String ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT = "false";
 
+  /**
+   * There is more often that in production uses different drivers for load and queries. So in case
+   * of load driver user should set this property to enable loader specific clean up.
+   */
+  public static final String DATA_MANAGEMENT_DRIVER = "spark.carbon.datamanagement.driver";
+
+  public static final String DATA_MANAGEMENT_DRIVER_DEFAULT = "false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 7acd6b1..9c465e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -368,6 +368,20 @@ public final class FileFactory {
     return path.delete();
   }
 
+  public static boolean deleteAllCarbonFilesOfDir(CarbonFile path) {
+    if (!path.exists()) {
+      return true;
+    }
+    if (!path.isDirectory()) {
+      return path.delete();
+    }
+    CarbonFile[] files = path.listFiles();
+    for (int i = 0; i < files.length; i++) {
+      deleteAllCarbonFilesOfDir(files[i]);
+    }
+    return path.delete();
+  }
+
   public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 7bd9094..0032ec0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -234,6 +234,48 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
   }
 
+  test("insert overwrite") {
+    val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+    sql("drop table if exists CarbonOverwrite")
+    sql("create table CarbonOverwrite (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedV
 ersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
+
+    sql("drop table if exists HiveOverwrite")
+    sql("create table HiveOverwrite (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVer
 sions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+
+    sql("insert into CarbonOverwrite select * from THive")
+    sql("insert into CarbonOverwrite select * from THive")
+    sql("insert into HiveOverwrite select * from THive")
+    sql("insert into HiveOverwrite select * from THive")
+    checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
+    sql("insert overwrite table CarbonOverwrite select * from THive")
+    sql("insert overwrite table HiveOverwrite select * from THive")
+    checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
+  }
+
+  test("Load overwrite") {
+    val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    sql("drop table if exists TCarbonSourceOverwrite")
+    sql("create table TCarbonSourceOverwrite (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADParti
 tionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
+
+    sql("drop table if exists HiveOverwrite")
+    sql("create table HiveOverwrite (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVer
 sions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Late
 st_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Late
 st_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE HiveOverwrite")
+    sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE HiveOverwrite")
+    checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO TABLE HiveOverwrite")
+    checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
+  }
 
 
   override def afterAll {
@@ -247,5 +289,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists insertTable")
     sql("drop table if exists CarbonDest")
     sql("drop table if exists HiveDest")
+    sql("drop table if exists CarbonOverwrite")
+    sql("drop table if exists HiveOverwrite")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index e2b0d11..a9c50f7 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -53,6 +53,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.datastore.row.LoadStatusType;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
@@ -254,7 +255,8 @@ public final class CarbonLoaderUtil {
    * @throws IOException
    */
   public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
-      CarbonLoadModel loadModel, boolean loadStartEntry) throws IOException {
+      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
+      throws IOException {
     boolean status = false;
     String metaDataFilepath =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
@@ -283,6 +285,11 @@ public final class CarbonLoaderUtil {
           newMetaEntry.setLoadName(segmentId);
           loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
           loadModel.setSegmentId(segmentId);
+          for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+            if (entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
+              throw new RuntimeException("Already insert overwrite is in progress");
+            }
+          }
           listOfLoadFolderDetails.add(newMetaEntry);
         } else {
           newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
@@ -296,6 +303,15 @@ public final class CarbonLoaderUtil {
             }
             indexToOverwriteNewMetaEntry++;
           }
+          if (listOfLoadFolderDetails.get(indexToOverwriteNewMetaEntry).getLoadStatus()
+              .equals(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+            throw new RuntimeException("It seems insert overwrite has been issued during load");
+          }
+          if (insertOverwrite) {
+            for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+              entry.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+            }
+          }
           listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
         }
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index dc37360..4a66d0f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -79,10 +79,10 @@ object CarbonStore {
       dbName: String,
       tableName: String,
       storePath: String,
-      carbonTable: CarbonTable): Unit = {
+      carbonTable: CarbonTable, forceTableClean: Boolean): Unit = {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     try {
-      DataManagementFunc.cleanFiles(dbName, tableName, storePath, carbonTable)
+      DataManagementFunc.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
       LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
     } catch {
       case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 5ab8160..e0829ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -28,8 +28,9 @@ import org.apache.spark.sql.execution.command.{CompactionCallableModel, Compacti
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
@@ -350,16 +351,24 @@ object DataManagementFunc {
       dbName: String,
       tableName: String,
       storePath: String,
-      carbonTable: CarbonTable): Unit = {
+      carbonTable: CarbonTable,
+      forceTableClean: Boolean): Unit = {
     val identifier = new CarbonTableIdentifier(dbName, tableName, "")
     val carbonCleanFilesLock =
       CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK)
     try {
       if (carbonCleanFilesLock.lockWithRetries()) {
         LOGGER.info("Clean files lock has been successfully acquired.")
-        deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
-          isForceDeletion = true, carbonTable)
-        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        if (forceTableClean) {
+          val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
+          FileFactory.deleteAllCarbonFilesOfDir(
+            FileFactory.getCarbonFile(absIdent.getTablePath,
+            FileFactory.getFileType(absIdent.getTablePath)))
+        } else {
+          deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
+            isForceDeletion = true, carbonTable)
+          CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        }
       } else {
         val errorMsg = "Clean files request is failed for " +
             s"$dbName.$tableName" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 92e8342..bae2c47 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -38,12 +38,15 @@ import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.row.LoadStatusType
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
@@ -418,7 +421,7 @@ object CommonUtil {
 
   def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel,
       storePath: String,
-      insertOverwrite: Boolean = false): Unit = {
+      insertOverwrite: Boolean): Unit = {
     val newLoadMetaEntry = new LoadMetadataDetails
     val status: String = if (insertOverwrite) {
       LoadStatusType.INSERT_OVERWRITE.getMessage
@@ -430,7 +433,8 @@ object CommonUtil {
     model.setFactTimeStamp(loadStartTime)
     CarbonLoaderUtil
       .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp, false)
-    val entryAdded: Boolean = CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true)
+    val entryAdded: Boolean =
+      CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
     if (!entryAdded) {
       sys
         .error(s"Failed to add entry in table status for ${ model.getDatabaseName }.${
@@ -617,4 +621,82 @@ object CommonUtil {
     AttributeReference("partition", StringType, nullable = false,
       new MetadataBuilder().putString("comment", "partitions info").build())()
   )
+
+  /**
+   * The in-progress segments which are left when the driver is down will be marked as deleted
+   * when driver is initializing.
+   * @param storePath
+   * @param sparkContext
+   */
+  def cleanInProgressSegments(storePath: String, sparkContext: SparkContext): Unit = {
+    val prop = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER)
+    if (prop != null) {
+      sparkContext.getConf.set(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER, prop)
+    }
+    val loaderDriver = sparkContext.getConf.get(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+      CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      val fileType = FileFactory.getFileType(storePath)
+      if (FileFactory.isFileExist(storePath, fileType)) {
+        val file = FileFactory.getCarbonFile(storePath, fileType)
+        val databaseFolders = file.listFiles()
+        databaseFolders.foreach { databaseFolder =>
+          if (databaseFolder.isDirectory) {
+            val tableFolders = databaseFolder.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val identifier =
+                  AbsoluteTableIdentifier.from(storePath,
+                    databaseFolder.getName, tableFolder.getName)
+                val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
+                val tableStatusFile = carbonTablePath.getTableStatusFilePath
+                if (FileFactory.isFileExist(tableStatusFile, fileType)) {
+                  val segmentStatusManager = new SegmentStatusManager(identifier)
+                  val carbonLock = segmentStatusManager.getTableStatusLock
+                  try {
+                    if (carbonLock.lockWithRetries) {
+                      LOGGER.info("Acquired lock for table" +
+                        identifier.getCarbonTableIdentifier.getTableUniqueName
+                        + " for table status updation")
+                      val listOfLoadFolderDetailsArray =
+                        SegmentStatusManager.readLoadMetadata(
+                          carbonTablePath.getMetadataDirectoryPath)
+                      var loadInprogressExist = false
+                      listOfLoadFolderDetailsArray.foreach { load =>
+                        if (load.getLoadStatus.equals(LoadStatusType.IN_PROGRESS.getMessage) ||
+                            load.getLoadStatus.equals(LoadStatusType.INSERT_OVERWRITE.getMessage)) {
+                          load.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+                          loadInprogressExist = true
+                        }
+                      }
+                      if (loadInprogressExist) {
+                        SegmentStatusManager
+                          .writeLoadDetailsIntoFile(tableStatusFile, listOfLoadFolderDetailsArray)
+                      }
+                    }
+                  } finally {
+                    if (carbonLock.unlock) {
+                      LOGGER.info(s"Released table status lock for table " +
+                                  s"${identifier.getCarbonTableIdentifier.getTableUniqueName}")
+                    } else {
+                      LOGGER.error(s"Error while releasing table status lock for table " +
+                                  s"${identifier.getCarbonTableIdentifier.getTableUniqueName}")
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 84a3413..7c2bf22 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -337,6 +337,7 @@ object CarbonDataRDDFactory {
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       result: Option[DictionaryServer],
+      overwriteTable: Boolean,
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -890,7 +891,7 @@ object CarbonDataRDDFactory {
                 carbonLoadModel.getFactTimeStamp,
                 true)
             val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-              carbonLoadModel, false)
+              carbonLoadModel, false, overwriteTable)
             if (!status) {
               val errorMessage = "Dataload failed due to failure in table status updation."
               LOGGER.audit("Data load is failed for " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 97a0593..9a02d13 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -311,7 +311,7 @@ object LoadTable {
 }
 
 private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation,
-                                          child: LogicalPlan) extends RunnableCommand {
+    child: LogicalPlan, isOverwriteExist: Boolean) extends RunnableCommand {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   def run(sqlContext: SQLContext): Seq[Row] = {
     val df = new DataFrame(sqlContext, child)
@@ -322,7 +322,7 @@ private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation,
       null,
       Seq(),
       scala.collection.immutable.Map("fileheader" -> header),
-      false,
+      isOverwriteExist,
       null,
       Some(df)).run(sqlContext)
     // updating relation metadata. This is in case of auto detect high cardinality
@@ -337,7 +337,7 @@ case class LoadTable(
     factPathFromUser: String,
     dimFilesPath: Seq[DataLoadTableFileMapping],
     options: scala.collection.immutable.Map[String, String],
-    isOverwriteExist: Boolean = false,
+    isOverwriteExist: Boolean,
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None) extends RunnableCommand {
@@ -360,9 +360,6 @@ case class LoadTable(
     }
 
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    if (isOverwriteExist) {
-      sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
-    }
     if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
       logError(s"Data loading failed. table not found: $dbName.$tableName")
       LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
@@ -562,7 +559,11 @@ case class LoadTable(
         val dimensions = carbonTable.getDimensionByTableName(
           carbonTable.getFactTableName).asScala.toArray
         // add the start entry for the new load in the table status file
-        CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath)
+        CommonUtil.
+          readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteExist)
+        if (isOverwriteExist) {
+          LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName")
+        }
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
             StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) {
           LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
@@ -622,6 +623,7 @@ case class LoadTable(
             columnar,
             partitionStatus,
             server,
+            isOverwriteExist,
             dataFrame,
             updateModel)
         } else {
@@ -667,6 +669,7 @@ case class LoadTable(
             columnar,
             partitionStatus,
             None,
+            isOverwriteExist,
             loadDataFrame,
             updateModel)
         }
@@ -973,8 +976,8 @@ private[sql] case class CleanFiles(
       getDB.getDatabaseName(databaseNameOp, sqlContext),
       tableName,
       sqlContext.asInstanceOf[CarbonContext].storePath,
-      carbonTable
-    )
+      carbonTable,
+      false)
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index aba39f7..13ff2a9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -70,9 +70,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
             carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
           }
         case InsertIntoCarbonTable(relation: CarbonDatasourceRelation,
-            _, child: LogicalPlan, _, _) =>
-            ExecutedCommand(LoadTableByInsert(relation,
-                child)) :: Nil
+            _, child: LogicalPlan, overwrite, _) =>
+            ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
         case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
           CarbonDictionaryDecoder(relations,
             profile,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index d3557a5..db7717c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -351,6 +351,7 @@ object CarbonDataRDDFactory {
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       result: Option[DictionaryServer],
+      overwriteTable: Boolean,
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -915,7 +916,7 @@ object CarbonDataRDDFactory {
               carbonLoadModel.getFactTimeStamp,
               true)
           val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-            carbonLoadModel, false)
+            carbonLoadModel, false, overwriteTable)
           if (!status) {
             val errorMessage = "Dataload failed due to failure in table status updation."
             LOGGER.audit("Data load is failed for " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 805a421..e9f2758 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -136,7 +136,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       null,
       Seq(),
       Map("fileheader" -> header) ++ options.toMap,
-      isOverwriteExist = false,
+      isOverwriteTable = false,
       null,
       Some(dataFrame)).run(sqlContext.sparkSession)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 7411e6e..d8a2978 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -96,7 +96,7 @@ case class CarbonDatasourceHadoopRelation(
           CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
     }
     if (data.logicalPlan.output.size >= carbonRelation.output.size) {
-      LoadTableByInsert(this, data.logicalPlan).run(sparkSession)
+      LoadTableByInsert(this, data.logicalPlan, overwrite).run(sparkSession)
     } else {
       sys.error("Cannot insert into target table because column number are different")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7390cf3..2c2b1cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -28,6 +28,7 @@ import org.apache.spark.util.Utils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Session implementation for {org.apache.spark.sql.SparkSession}
@@ -156,7 +157,8 @@ object CarbonSession {
         session = new CarbonSession(sparkContext)
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
         SparkSession.setDefaultSession(session)
-
+        CommonUtil.cleanInProgressSegments(
+          carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION), sparkContext)
         // Register a successfully instantiated context to the singleton. This should be at the
         // end of the class definition so that the singleton is updated only if there is no
         // exception in the construction of the instance.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 18d2dc7..c8c716e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -59,8 +59,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output)) :: Nil
       case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
-      _, child: LogicalPlan, _, _) =>
-        ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
+      _, child: LogicalPlan, overwrite, _) =>
+        ExecutedCommandExec(LoadTableByInsert(relation, child, overwrite.enabled)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
         CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
         ExecutedCommandExec(createDb) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f42d419..ca832c7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -346,7 +346,9 @@ object LoadTable {
 
 }
 
-case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
+case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation,
+    child: LogicalPlan,
+    overwrite: Boolean)
   extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -362,7 +364,7 @@ case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: Lo
       null,
       Seq(),
       scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteExist = false,
+      overwrite,
       null,
       Some(df)).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
@@ -378,7 +380,7 @@ case class LoadTable(
     factPathFromUser: String,
     dimFilesPath: Seq[DataLoadTableFileMapping],
     options: scala.collection.immutable.Map[String, String],
-    isOverwriteExist: Boolean = false,
+    isOverwriteTable: Boolean,
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None) extends RunnableCommand with DataProcessCommand {
@@ -485,9 +487,6 @@ case class LoadTable(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    if (isOverwriteExist) {
-      sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
-    }
 
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
         .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
@@ -642,7 +641,11 @@ case class LoadTable(
         GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
         val storePath = relation.tableMeta.storePath
         // add the start entry for the new load in the table status file
-        CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath)
+        CommonUtil.
+          readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
+        if (isOverwriteTable) {
+          LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+        }
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
             StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
           LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
@@ -718,6 +721,7 @@ case class LoadTable(
             columnar,
             partitionStatus,
             server,
+            isOverwriteTable,
             dataFrame,
             updateModel)
         } else {
@@ -757,6 +761,7 @@ case class LoadTable(
             columnar,
             partitionStatus,
             None,
+            isOverwriteTable,
             loadDataFrame,
             updateModel)
         }
@@ -797,7 +802,8 @@ case class LoadTable(
 
 case class CleanFiles(
     databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand with DataProcessCommand {
+    tableName: String, forceTableClean: Boolean = false)
+  extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     processData(sparkSession)
@@ -805,16 +811,25 @@ case class CleanFiles(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val relation = catalog
+    if (forceTableClean) {
+      CarbonStore.cleanFiles(
+        getDB.getDatabaseName(databaseNameOp, sparkSession),
+        tableName,
+        CarbonEnv.getInstance(sparkSession).storePath,
+        null,
+        forceTableClean)
+    } else {
+      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val relation = catalog
         .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
-    CarbonStore.cleanFiles(
-      getDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
-      relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
-      carbonTable
-    )
+      val carbonTable = relation.tableMeta.carbonTable
+      CarbonStore.cleanFiles(
+        getDB.getDatabaseName(databaseNameOp, sparkSession),
+        tableName,
+        relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+        carbonTable,
+        forceTableClean)
+    }
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index df73641..421cd2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -28,13 +28,22 @@ import org.apache.carbondata.api.CarbonStore
  // scalastyle:off
 object CleanFiles {
 
+  /**
+   * Clean the stale segments from table
+   * @param spark
+   * @param dbName
+   * @param tableName
+   * @param storePath
+   * @param forceTableClean if true, it deletes the table and its contents with force.It does not
+   *                        drop table from hive metastore so should be very careful to use it.
+   */
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
-      storePath: String): Unit = {
+      storePath: String, forceTableClean: Boolean = false): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
       lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
       tableMeta.carbonTable
-    CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable)
+    CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
   }
 
   def main(args: Array[String]): Unit = {
@@ -46,9 +55,13 @@ object CleanFiles {
 
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
+    var forceTableClean = false
+    if (args.length > 2) {
+      forceTableClean = args(2).toBoolean
+    }
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
       checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
-    cleanFiles(spark, dbName, tableName, storePath)
+    cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 8adaf00..72c7426 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -61,7 +61,7 @@ object TableLoader {
 
   def loadTable(spark: SparkSession, dbName: Option[String], tableName: String, inputPaths: String,
       options: scala.collection.immutable.Map[String, String]): Unit = {
-    LoadTable(dbName, tableName, inputPaths, Nil, options).run(spark)
+    LoadTable(dbName, tableName, inputPaths, Nil, options, false).run(spark)
   }
 
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 2731812..0331184 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -58,7 +58,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4")
       """)
     LoadTable(Some("default"), "t4", s"$resourcesPath/source.csv", Nil,
-      Map()).run(sqlContext.sparkSession)
+      Map(), false).run(sqlContext.sparkSession)
     val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4")
     if (table != null && table.getBucketingInfo("t4") != null) {
       assert(true)
@@ -78,7 +78,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t10")
       """)
     LoadTable(Some("default"), "t10", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map(("use_kettle", "false")), false).run(sqlContext.sparkSession)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
     val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t10")
     if (table != null && table.getBucketingInfo("t10") != null) {
@@ -115,7 +115,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("tableName"="t5")
       """)
     LoadTable(Some("default"), "t5", s"$resourcesPath/source.csv", Nil,
-      Map()).run(sqlContext.sparkSession)
+      Map(), false).run(sqlContext.sparkSession)
 
     val plan = sql(
       """
@@ -140,7 +140,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6")
       """)
     LoadTable(Some("default"), "t6", s"$resourcesPath/source.csv", Nil,
-      Map()).run(sqlContext.sparkSession)
+      Map(), false).run(sqlContext.sparkSession)
 
     val plan = sql(
       """
@@ -165,7 +165,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7")
       """)
     LoadTable(Some("default"), "t7", s"$resourcesPath/source.csv", Nil,
-      Map()).run(sqlContext.sparkSession)
+      Map(), false).run(sqlContext.sparkSession)
 
     sql("DROP TABLE IF EXISTS bucketed_parquet_table")
     sql("select * from t7").write
@@ -196,7 +196,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8")
       """)
     LoadTable(Some("default"), "t8", s"$resourcesPath/source.csv", Nil,
-      Map()).run(sqlContext.sparkSession)
+      Map(), false).run(sqlContext.sparkSession)
 
     sql("DROP TABLE IF EXISTS parquet_table")
     sql("select * from t8").write

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
index 55eaa20..9b0b4fa 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
@@ -44,7 +44,7 @@ class VectorReaderTestCase extends QueryTest with BeforeAndAfterAll {
           OPTIONS("tableName"="vectorreader")
       """)
     LoadTable(Some("default"), "vectorreader", s"$resourcesPath/source.csv", Nil,
-      Map()).run(sqlContext.sparkSession)
+      Map(), false).run(sqlContext.sparkSession)
   }
 
   test("test vector reader") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/67b59634/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index fc67cdf..c4acff2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -78,4 +78,16 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
     assert(f.list().length == 0)
     dropTable(table)
   }
+
+  test("clean files with force clean option") {
+    val table = "carbon_table4"
+    dropTable(table)
+    createAndLoadTestTable(table, "csv_table")
+    CleanFiles.main(Array(s"${location}", table, "true"))
+    val tablePath = s"${location}${File.separator}default${File.separator}$table"
+    val f = new File(tablePath)
+    assert(!f.exists())
+
+    dropTable(table)
+  }
 }


Mime
View raw message