carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1380] Fixed updation of load fail to table status file to make overwrite queries work properly
Date Sat, 16 Sep 2017 16:52:40 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master d60d973df -> de445bb66


[CARBONDATA-1380] Fixed updation of load fail to table status file to make overwrite queries
work properly

Tablestatus file is not updated when load fails. It should be updated with fail status otherwise
overwrite queries does not work properly.

This closes #1256


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de445bb6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de445bb6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de445bb6

Branch: refs/heads/master
Commit: de445bb66c48b9d2db0cf87d03c0af171489644b
Parents: d60d973
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Mon Aug 14 18:58:25 2017 +0530
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Sat Sep 16 22:21:36 2017 +0530

----------------------------------------------------------------------
 .../InsertIntoCarbonTableTestCase.scala         | 20 +++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 47 +++++++++++--------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 48 ++++++++++++--------
 3 files changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index d1bf28b..8a084ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -54,7 +54,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
      checkAnswer(
          sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_oper
 atorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
from TCarbonSource order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNum
 ber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
          sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_oper
 atorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
from TCarbon order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
-     ) 
+     )
      val result = sql("show segments for table TCarbon").collect()(0).get(1).toString()
      if(!"Success".equalsIgnoreCase(result)) {
        assert(false)
@@ -247,6 +247,24 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     assert(folder.list().length == 1)
   }
 
+  test("Load overwrite fail handle") {
+    sql("drop table if exists TCarbonSourceOverwrite")
+    sql("create table TCarbonSourceOverwrite (imei string,deviceInformationId int,MAC string,deviceColor
string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit
string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels
string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string,
deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string,
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string,
ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion
string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string,
Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion
string, Active_phonePADParti
 tionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR
string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string,
Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion
string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string,
Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string,
Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId
string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
+
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' INTO table TCarbonSourceOverwrite
options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Late
 st_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    val rowCount = sql("select imei from TCarbonSourceOverwrite").count()
+    try {
+      sql("LOAD DATA INPATH '" + resourcesPath +
+          "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',',
'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacF
 lashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber',
'bad_records_action'='fail')")
+      assert(false, "Bad records exists and logger action is fail, so it should fail")
+    } catch {
+      case e: Exception =>
+        assert(true)
+    }
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite
options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count())
+  }
+
 
   override def afterAll {
     sql("drop table if exists load")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5725717..ef2a917 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -418,6 +418,32 @@ object CarbonDataRDDFactory {
       }
     }
 
+    def updateStatus(loadStatus: String,
+        stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = {
+      val metadataDetails = if (stat != null && stat(0) != null) {
+        stat(0)._2._1
+      } else {
+        new LoadMetadataDetails
+      }
+      CarbonLoaderUtil
+        .populateNewLoadMetaEntry(metadataDetails,
+          loadStatus,
+          carbonLoadModel.getFactTimeStamp,
+          true)
+      val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
+        carbonLoadModel, false, overwriteTable)
+      if (!status) {
+        val errorMessage = "Dataload failed due to failure in table status updation."
+        LOGGER.audit("Data load is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${
+                       carbonLoadModel
+                         .getTableName
+                     }")
+        LOGGER.error("Dataload failed due to failure in table status updation.")
+        throw new Exception(errorMessage)
+      }
+    }
+
     try {
       LOGGER.audit(s"Data load request has been received for table" +
           s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -888,6 +914,7 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
+        updateStatus(loadStatus, status)
         throw new Exception(errorMessage)
       } else {
         // check if data load fails due to bad record and throw data load failure due to
@@ -900,28 +927,12 @@ object CarbonDataRDDFactory {
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
+          updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status)
           throw new Exception(status(0)._2._2.errorMsg)
         }
-        val metadataDetails = status(0)._2._1
         if (!isAgg) {
             writeDictionary(carbonLoadModel, result)
-            CarbonLoaderUtil
-              .populateNewLoadMetaEntry(metadataDetails,
-                loadStatus,
-                carbonLoadModel.getFactTimeStamp,
-                true)
-            val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-              carbonLoadModel, false, overwriteTable)
-            if (!status) {
-              val errorMessage = "Dataload failed due to failure in table status updation."
-              LOGGER.audit("Data load is failed for " +
-                           s"${ carbonLoadModel.getDatabaseName }.${
-                             carbonLoadModel
-                               .getTableName
-                           }")
-              LOGGER.error("Dataload failed due to failure in table status updation.")
-              throw new Exception(errorMessage)
-            }
+            updateStatus(loadStatus, status)
         } else if (!carbonLoadModel.isRetentionRequest) {
           // TODO : Handle it
           LOGGER.info("********Database updated**********")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 94d7b15..c7b72d5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -517,6 +517,32 @@ object CarbonDataRDDFactory {
       }
     }
 
+    def updateStatus(status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
+        loadStatus: String) = {
+      val metadataDetails = if (status != null && status(0) != null) {
+        status(0)._2._1
+      } else {
+        new LoadMetadataDetails
+      }
+      CarbonLoaderUtil
+        .populateNewLoadMetaEntry(metadataDetails,
+          loadStatus,
+          carbonLoadModel.getFactTimeStamp,
+          true)
+      val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
+        carbonLoadModel, false, overwriteTable)
+      if (!success) {
+        val errorMessage = "Dataload failed due to failure in table status updation."
+        LOGGER.audit("Data load is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
+        LOGGER.error("Dataload failed due to failure in table status updation.")
+        throw new Exception(errorMessage)
+      } else if (!carbonLoadModel.isRetentionRequest) {
+        // TODO : Handle it
+        LOGGER.info("********Database updated**********")
+      }
+    }
+
     try {
       LOGGER.audit(s"Data load request has been received for table" +
           s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -986,6 +1012,7 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
+        updateStatus(status, loadStatus)
         throw new Exception(errorMessage)
       } else {
         // check if data load fails due to bad record and throw data load failure due to
@@ -998,6 +1025,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
+          updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           throw new Exception(status(0)._2._2.errorMsg)
         }
         // if segment is empty then fail the data load
@@ -1009,27 +1037,11 @@ object CarbonDataRDDFactory {
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}" +
                        " as there is no data to load")
           LOGGER.warn("Cannot write load metadata file as data load failed")
+          updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           throw new Exception("No Data to load")
         }
-        val metadataDetails = status(0)._2._1
         writeDictionary(carbonLoadModel, result, false)
-        CarbonLoaderUtil
-          .populateNewLoadMetaEntry(metadataDetails,
-            loadStatus,
-            carbonLoadModel.getFactTimeStamp,
-            true)
-        val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-          carbonLoadModel, false, overwriteTable)
-        if (!success) {
-          val errorMessage = "Dataload failed due to failure in table status updation."
-          LOGGER.audit("Data load is failed for " +
-              s"${ carbonLoadModel.getDatabaseName }.${carbonLoadModel.getTableName}")
-          LOGGER.error("Dataload failed due to failure in table status updation.")
-          throw new Exception(errorMessage)
-        } else if (!carbonLoadModel.isRetentionRequest) {
-          // TODO : Handle it
-          LOGGER.info("********Database updated**********")
-        }
+        updateStatus(status, loadStatus)
 
         if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
           LOGGER.audit("Data load is partially successful for " +


Mime
View raw message