carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: handling the exception cases in compaction. if compaction is failed for table requested by user then need to show error message in the beeline. and if any exception occurs then need to continue compaction for the qu
Date Sat, 17 Sep 2016 20:42:41 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master de56d0e40 -> 0d9970f66


handling the exception cases in compaction.
if compaction is failed for table requested by user then need to show error message in the
beeline.
and if any exception occurs then need to continue compaction for the queued compactions.

changing error message.

if the deletion of the compaction required file is not success then adding that table in the
skip list so that it wont be considered again for compaction. this is to avoid the infinite
loop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/13b5d549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/13b5d549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/13b5d549

Branch: refs/heads/master
Commit: 13b5d549864f321fb8d47f2706aa120b94e409c7
Parents: de56d0e
Author: ravikiran <ravikiran.sn042@gmail.com>
Authored: Wed Sep 14 18:09:54 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Sun Sep 18 02:11:46 2016 +0530

----------------------------------------------------------------------
 .../spark/merger/CarbonCompactionUtil.java      | 10 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 75 ++++++++++++++------
 .../execution/command/carbonTableSchema.scala   | 30 +++++---
 3 files changed, 80 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13b5d549/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
index b3198b4..ed669a6 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
@@ -262,14 +263,19 @@ public class CarbonCompactionUtil {
 
   /**
    * This will check if any compaction request has been received for any table.
+   *
    * @param tableMetas
    * @return
    */
-  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas) {
+  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+      List<CarbonTableIdentifier> skipList) {
     for (TableMeta table : tableMetas) {
       CarbonTable ctable = table.carbonTable();
       String metadataPath = ctable.getMetaDataFilepath();
-      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath)) {
+      // check for the compaction required file and at the same time exclude the tables which
are
+      // present in the skip list.
+      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
+          .contains(table.carbonTableIdentifier())) {
         return table;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13b5d549/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 31cc8ac..1144c59 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,11 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{util => _, _}
 import org.apache.spark.sql.{CarbonEnv, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
CompactionModel, Partitioner}
-import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
@@ -363,6 +363,10 @@ object CarbonDataRDDFactory extends Logging {
         case e : Exception =>
           logger.error("Exception in start compaction thread. " + e.getMessage)
           lock.unlock()
+          // if the compaction is a blocking call then only need to throw the exception.
+          if (compactionModel.isDDLTrigger) {
+            throw e
+          }
       }
     }
     else {
@@ -537,13 +541,24 @@ object CarbonDataRDDFactory extends Logging {
         override def run(): Unit = {
 
           try {
-            executeCompaction(carbonLoadModel: CarbonLoadModel,
-              hdfsStoreLocation: String,
-              compactionModel: CompactionModel,
-              partitioner: Partitioner,
-              executor, sqlContext, kettleHomePath, storeLocation
-            )
-            // check for all the tables.
+            // compaction status of the table which is triggered by the user.
+            var triggeredCompactionStatus = false
+            var exception : Exception = null
+            try {
+              executeCompaction(carbonLoadModel: CarbonLoadModel,
+                hdfsStoreLocation: String,
+                compactionModel: CompactionModel,
+                partitioner: Partitioner,
+                executor, sqlContext, kettleHomePath, storeLocation
+              )
+              triggeredCompactionStatus = true
+            }
+            catch {
+              case e: Exception =>
+                logger.error("Exception in compaction thread " + e.getMessage)
+                exception = e
+            }
+            // continue in case of exception also, check for all the tables.
             val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
               .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
                 CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
@@ -551,15 +566,16 @@ object CarbonDataRDDFactory extends Logging {
 
             if (!isConcurrentCompactionAllowed) {
               logger.info("System level compaction lock is enabled.")
+              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
               var tableForCompaction = CarbonCompactionUtil
                 .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                  .tablesMeta.toArray
+                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
                 )
-              while(null != tableForCompaction) {
+              while (null != tableForCompaction) {
                 logger
                   .info("Compaction request has been identified for table " + tableForCompaction
                     .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
-                    .getTableName
+                          .getTableName
                   )
                 val table: CarbonTable = tableForCompaction.carbonTable
                 val metadataPath = table.getMetaDataFilepath
@@ -590,32 +606,45 @@ object CarbonDataRDDFactory extends Logging {
                     executor, sqlContext, kettleHomePath, storeLocation
                   )
                 }
+                catch {
+                  case e: Exception =>
+                    logger.error("Exception in compaction thread for table " + tableForCompaction
+                      .carbonTable.getDatabaseName + "." +
+                                 tableForCompaction.carbonTableIdentifier
+                                   .getTableName)
+                  // not handling the exception. only logging as this is not the table triggered
+                  // by user.
+                }
                 finally {
-                  // delete the compaction required file
+                  // delete the compaction required file in case of failure or success also.
                   if (!CarbonCompactionUtil
                     .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                    // if the compaction request file is not been able to delete then
+                    // add those tables details to the skip list so that it wont be considered
next.
+                    skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
                     logger
                       .error("Compaction request file can not be deleted for table " +
-                        tableForCompaction
-                        .carbonTable.getDatabaseName + "." + tableForCompaction
-                        .carbonTableIdentifier
-                        .getTableName
+                             tableForCompaction
+                               .carbonTable.getDatabaseName + "." + tableForCompaction
+                               .carbonTableIdentifier
+                               .getTableName
                       )
+
                   }
                 }
                 // ********* check again for all the tables.
                 tableForCompaction = CarbonCompactionUtil
                   .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                    .tablesMeta.toArray
+                    .tablesMeta.toArray, skipCompactionTables.asJava
                   )
               }
+              // giving the user his error for telling in the beeline if his triggered table
+              // compaction is failed.
+              if (!triggeredCompactionStatus) {
+                throw new Exception("Exception in compaction " + exception.getMessage)
+              }
             }
           }
-          catch {
-            case e: Exception =>
-              logger.error("Exception in compaction thread " + e.getMessage)
-              throw e
-          }
           finally {
             executor.shutdownNow()
             deletePartialLoadsInCompaction(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13b5d549/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b8c56d3..19519ea 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -794,16 +794,26 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel)
e
         System.getProperty("java.io.tmpdir")
       )
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-
-    CarbonDataRDDFactory
-      .alterTableForCompaction(sqlContext,
-        alterTableModel,
-        carbonLoadModel,
-        partitioner,
-        relation.tableMeta.storePath,
-        kettleHomePath,
-        storeLocation
-      )
+    try {
+      CarbonDataRDDFactory
+        .alterTableForCompaction(sqlContext,
+          alterTableModel,
+          carbonLoadModel,
+          partitioner,
+          relation.tableMeta.storePath,
+          kettleHomePath,
+          storeLocation
+        )
+    }
+    catch {
+      case e: Exception =>
+        if (null != e.getMessage) {
+          sys.error("Compaction failed. Please check logs for more info." + e.getMessage)
+        }
+        else {
+          sys.error("Exception in compaction. Please check logs for more info.")
+        }
+    }
 
     Seq.empty
   }


Mime
View raw message