carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: disabling the system compaction lock feature. and making the load ddl to wait for compaction to finish in the auto compaction case.
Date Sat, 17 Sep 2016 21:43:52 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 8802e9ebb -> 17f602dec


disabling the system compaction lock feature. and making the load ddl to wait for compaction
to finish in the auto compaction case.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9702f7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9702f7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9702f7a8

Branch: refs/heads/master
Commit: 9702f7a89179a610bd4c8c87e52994d94a946257
Parents: 8802e9e
Author: ravikiran <ravikiran.sn042@gmail.com>
Authored: Sat Sep 17 15:18:22 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Sun Sep 18 03:09:57 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 64 ++++++++++----------
 .../CompactionSystemLockFeatureTest.scala       |  2 +
 3 files changed, 37 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9702f7a8/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 41d6ebf..1d60ee9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -911,6 +911,7 @@ public final class CarbonCommonConstants {
   public static String majorCompactionRequiredFile = "compactionRequired_major";
 
   /**
+   * @Deprecated : This property has been deprecated.
    * Property for enabling system level compaction lock.1 compaction can run at once.
    */
   public static String ENABLE_CONCURRENT_COMPACTION =
@@ -920,7 +921,7 @@ public final class CarbonCommonConstants {
    * Default value of Property for enabling system level compaction lock.1 compaction can
run
    * at once.
    */
-  public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "false";
+  public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "true";
 
   /**
    * Compaction system level lock folder.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9702f7a8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5a1d14f..c50720a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -652,14 +652,9 @@ object CarbonDataRDDFactory extends Logging {
           }
         }
       }
-      if(compactionModel.isDDLTrigger) {
-        // making this an blocking call for DDL
-        compactionThread.run()
-      }
-      else {
-        // non blocking call in case of auto compaction.
-        compactionThread.start()
-      }
+    // calling the run method of a thread to make the call as blocking call.
+    // in the future we may make this as concurrent.
+    compactionThread.run()
   }
 
   def prepareCarbonLoadModel(hdfsStoreLocation: String,
@@ -775,6 +770,7 @@ object CarbonDataRDDFactory extends Logging {
               case e : Exception =>
                 logger.error("Exception in start compaction thread. " + e.getMessage)
                 lock.unlock()
+                throw e
             }
           }
           else {
@@ -846,9 +842,6 @@ object CarbonDataRDDFactory extends Logging {
       val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
         .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
-      // compaction handling
-      handleSegmentMerging(tableCreationTime)
-
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
@@ -1059,28 +1052,37 @@ object CarbonDataRDDFactory extends Logging {
         logWarning("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
-        val metadataDetails = status(0)._2
-        if (!isAgg) {
-          val status = CarbonLoaderUtil
-            .recordLoadMetadata(currentLoadCount,
-              metadataDetails,
-              carbonLoadModel,
-              loadStatus,
-              loadStartTime
-            )
-          if (!status) {
-            val errorMessage = "Dataload failed due to failure in table status updation."
-            logger.audit("Data load is failed for " +
-              s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-            logger.error("Dataload failed due to failure in table status updation.")
-            throw new Exception(errorMessage)
+          val metadataDetails = status(0)._2
+          if (!isAgg) {
+            val status = CarbonLoaderUtil
+              .recordLoadMetadata(currentLoadCount,
+                metadataDetails,
+                carbonLoadModel,
+                loadStatus,
+                loadStartTime
+              )
+            if (!status) {
+              val errorMessage = "Dataload failed due to failure in table status updation."
+              logger.audit("Data load is failed for " +
+                           s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+              logger.error("Dataload failed due to failure in table status updation.")
+              throw new Exception(errorMessage)
+            }
+          } else if (!carbonLoadModel.isRetentionRequest) {
+            // TODO : Handle it
+            logInfo("********Database updated**********")
           }
-        } else if (!carbonLoadModel.isRetentionRequest) {
-          // TODO : Handle it
-          logInfo("********Database updated**********")
+          logger.audit("Data load is successful for " +
+                       s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+        try {
+          // compaction handling
+          handleSegmentMerging(tableCreationTime)
+        }
+        catch {
+          case e: Exception =>
+            throw new Exception(
+              "Dataload is success. Auto-Compaction has failed. Please check logs.")
         }
-        logger.audit("Data load is successful for " +
-          s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9702f7a8/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index d9e1349..a040550 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -43,6 +43,8 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists  table2")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, "false")
     sql(
       "CREATE TABLE IF NOT EXISTS table1 (country String, ID Int, date Timestamp, name "
+
         "String, " +


Mime
View raw message