carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [05/49] incubator-carbondata git commit: Changes done: 1. Support creation and deletion of dictionary files during alter add and drop columns through RDD to parallelize the task and increase the performance 2. Support clean up of dictionary files in case
Date Fri, 07 Apr 2017 09:55:08 GMT
Changes done:
1. Support creation and deletion of dictionary files during alter add and drop columns through
RDD to parallelize the task and increase the performance
2. Support clean up of dictionary files in case any failure occurs during alter add columns
operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b5ba4c6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b5ba4c6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b5ba4c6e

Branch: refs/heads/12-dev
Commit: b5ba4c6ea2d864f099bd4112e2cd5260e615a0a8
Parents: 4a7adfa
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Tue Apr 4 19:59:18 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Apr 6 10:53:51 2017 +0530

----------------------------------------------------------------------
 .../core/cache/dictionary/ManageDictionary.java |  85 +++++++-------
 .../spark/rdd/AlterTableAddColumnRDD.scala      | 110 +++++++++++++++++++
 .../spark/rdd/AlterTableDropColumnRDD.scala     |  96 ++++++++++++++++
 .../execution/command/carbonTableSchema.scala   |  21 ++--
 .../execution/command/AlterTableCommands.scala  |  48 +++++---
 5 files changed, 286 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
index 706bc20..0a38890 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.core.cache.dictionary;
 
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -30,8 +29,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -51,62 +49,59 @@ public class ManageDictionary {
    * This method will delete the dictionary files for the given column IDs and
    * clear the dictionary cache
    *
-   * @param dictionaryColumns
-   * @param carbonTable
+   * @param columnSchema
+   * @param carbonTableIdentifier
+   * @param storePath
    */
-  public static void deleteDictionaryFileAndCache(List<CarbonColumn> dictionaryColumns,
-      CarbonTable carbonTable) {
-    if (!dictionaryColumns.isEmpty()) {
-      CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-      CarbonTablePath carbonTablePath =
-          CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier);
-      String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
-      CarbonFile metadataDir = FileFactory
-          .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
-      for (final CarbonColumn column : dictionaryColumns) {
-        // sort index file is created with dictionary size appended to it. So all the files
-        // with a given column ID need to be listed
-        CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            if (path.getName().startsWith(column.getColumnId())) {
-              return true;
-            }
-            return false;
-          }
-        });
-        for (CarbonFile file : listFiles) {
-          // try catch is inside for loop because even if one deletion fails, other files
-          // still need to be deleted
-          try {
-            FileFactory.deleteFile(file.getCanonicalPath(),
-                FileFactory.getFileType(file.getCanonicalPath()));
-          } catch (IOException e) {
-            LOGGER.error(
-                "Failed to delete dictionary or sortIndex file for column " + column.getColName()
-                    + "with column ID " + column.getColumnId());
-          }
+  public static void deleteDictionaryFileAndCache(final ColumnSchema columnSchema,
+      CarbonTableIdentifier carbonTableIdentifier, String storePath) {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+    String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
+    CarbonFile metadataDir = FileFactory
+        .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
+    // sort index file is created with dictionary size appended to it. So all the files
+    // with a given column ID need to be listed
+    CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile path) {
+        if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
+          return true;
         }
-        // remove dictionary cache
-        removeDictionaryColumnFromCache(carbonTable, column.getColumnId());
+        return false;
+      }
+    });
+    for (CarbonFile file : listFiles) {
+      // try catch is inside for loop because even if one deletion fails, other files
+      // still need to be deleted
+      try {
+        FileFactory
+            .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath()));
+      } catch (IOException e) {
+        LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema
+            .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId());
       }
     }
+    // remove dictionary cache
+    removeDictionaryColumnFromCache(carbonTableIdentifier, storePath,
+        columnSchema.getColumnUniqueId());
   }
 
   /**
    * This method will remove dictionary cache from driver for both reverse and forward dictionary
    *
-   * @param carbonTable
+   * @param carbonTableIdentifier
+   * @param storePath
    * @param columnId
    */
-  public static void removeDictionaryColumnFromCache(CarbonTable carbonTable, String columnId)
{
-    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance()
-        .createCache(CacheType.REVERSE_DICTIONARY, carbonTable.getStorePath());
+  public static void removeDictionaryColumnFromCache(CarbonTableIdentifier carbonTableIdentifier,
+      String storePath, String columnId) {
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
+        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, storePath);
     DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
-        new DictionaryColumnUniqueIdentifier(carbonTable.getCarbonTableIdentifier(),
+        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
             new ColumnIdentifier(columnId, null, null));
     dictCache.invalidate(dictionaryColumnUniqueIdentifier);
-    dictCache = CacheProvider.getInstance()
-        .createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+    dictCache = CacheProvider.getInstance().createCache(CacheType.FORWARD_DICTIONARY, storePath);
     dictCache.invalidate(dictionaryColumnUniqueIdentifier);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
new file mode 100644
index 0000000..bb65b0b
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.AlterTableAddColumnsModel
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.spark.util.GlobalDictionaryUtil
+
+/**
+ * This is a partitioner class for dividing the newly added columns into partitions
+ *
+ * @param rddId
+ * @param idx
+ * @param schema
+ */
+class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Partition {
+  override def index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+  val columnSchema = schema
+}
+
+/**
+ * This class is aimed at generating dictionary file for the newly added columns
+ */
+class AlterTableAddColumnRDD[K, V](sc: SparkContext,
+    @transient newColumns: Seq[ColumnSchema],
+    alterTableModel: AlterTableAddColumnsModel,
+    carbonTableIdentifier: CarbonTableIdentifier,
+    carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    newColumns.zipWithIndex.map { column =>
+      new DropColumnPartition(id, column._2, column._1)
+    }.toArray
+  }
+
+  override def compute(split: Partition,
+      context: TaskContext): Iterator[(Int, String)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    val iter = new Iterator[(Int, String)] {
+      try {
+        val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
+        // create dictionary file if it is a dictionary column
+        if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
+            !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          val carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(carbonStorePath, carbonTableIdentifier)
+          var rawData: String = null
+          if (null != columnSchema.getDefaultValue) {
+            rawData = new String(columnSchema.getDefaultValue,
+              CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+          }
+          GlobalDictionaryUtil
+            .loadDefaultDictionaryValueForNewColumn(carbonTablePath,
+              columnSchema,
+              carbonTableIdentifier,
+              carbonStorePath,
+              rawData)
+        }
+      } catch {
+        case ex: Exception =>
+          throw ex
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+
+        if (!finished) {
+          finished = true
+          finished
+        } else {
+          !finished
+        }
+      }
+
+      override def next(): (Int, String) = {
+        (split.index, status)
+      }
+    }
+    iter
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
new file mode 100644
index 0000000..49dadd3
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionary
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+
+/**
+ * This is a partitioner class for dividing the newly added columns into partitions
+ *
+ * @param rddId
+ * @param idx
+ * @param schema
+ */
+class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Partition {
+  override def index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+  val columnSchema = schema
+}
+
+/**
+ * This class is aimed at generating dictionary file for the newly added columns
+ */
+class AlterTableDropColumnRDD[K, V](sc: SparkContext,
+    @transient newColumns: Seq[ColumnSchema],
+    carbonTableIdentifier: CarbonTableIdentifier,
+    carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    newColumns.zipWithIndex.map { column =>
+      new DropColumnPartition(id, column._2, column._1)
+    }.toArray
+  }
+
+  override def compute(split: Partition,
+      context: TaskContext): Iterator[(Int, String)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    val iter = new Iterator[(Int, String)] {
+      try {
+        val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
+        if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
+            !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          ManageDictionary
+            .deleteDictionaryFileAndCache(columnSchema, carbonTableIdentifier, carbonStorePath)
+        }
+      } catch {
+        case ex: Exception =>
+          LOGGER.error(ex, ex.getMessage)
+          throw ex
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+
+        if (!finished) {
+          finished = true
+          finished
+        } else {
+          !finished
+        }
+      }
+
+      override def next(): (Int, String) = {
+        (split.index, status)
+      }
+    }
+    iter
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6c44264..dadd03e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 
@@ -42,6 +43,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.FailureCauses
 import org.apache.carbondata.spark.merger.CompactionType
+import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD
 import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}
 
 case class TableModel(
@@ -156,7 +158,7 @@ class AlterTableProcessor(
     tableInfo: TableInfo,
     carbonTablePath: CarbonTablePath,
     tableIdentifier: CarbonTableIdentifier,
-    storePath: String) {
+    storePath: String, sc: SparkContext) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -227,7 +229,6 @@ class AlterTableProcessor(
           tablePropertiesMap.put(x._1, x._2)
         }
     }
-
     // This part will create dictionary file for all newly added dictionary columns
     // if valid default value is provided,
     // then that value will be included while creating dictionary file
@@ -251,17 +252,13 @@ class AlterTableProcessor(
           }
         }
       }
-      if (col.getEncodingList.contains(Encoding.DICTIONARY) &&
-          !col.getEncodingList.contains(Encoding.DIRECT_DICTIONARY)) {
-        GlobalDictionaryUtil
-          .loadDefaultDictionaryValueForNewColumn(carbonTablePath,
-            col,
-            tableIdentifier,
-            storePath,
-            rawData)
-      }
     }
-
+    // generate dictionary files for the newly added columns
+    new AlterTableAddColumnRDD(sc,
+      newCols,
+      alterTableModel,
+      tableIdentifier,
+      storePath).collect()
     tableSchema.setListOfColumns(allColumns.asJava)
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index efa2cd5..93a5912 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.ManageDictionary
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -38,6 +37,7 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
 
 private[sql] case class AlterTableAddColumns(
@@ -52,9 +52,10 @@ private[sql] case class AlterTableAddColumns(
     LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
     val carbonLock = AlterTableUtil
       .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+    // get the latest carbon table and check for column existence
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
     try {
-      // get the latest carbon table and check for column existence
-      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -67,12 +68,12 @@ private[sql] case class AlterTableAddColumns(
           dbName,
           tableName,
           carbonTable.getStorePath)
-      val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
+      newCols = new AlterTableProcessor(alterTableAddColumnsModel,
         dbName,
         wrapperTableInfo,
         carbonTablePath,
         carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).process
+        carbonTable.getStorePath, sparkSession.sparkContext).process
       val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
       schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
       schemaEvolutionEntry.setAdded(newCols.toList.asJava)
@@ -89,8 +90,16 @@ private[sql] case class AlterTableAddColumns(
       LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table add columns failed : " + e.getMessage)
-        throw e
+        LOGGER.error(e, s"Alter table add columns failed : ${e.getMessage}")
+        // clean up the dictionary files in case of any failure
+        if (!newCols.isEmpty) {
+          LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
+          new AlterTableDropColumnRDD(sparkSession.sparkContext,
+            newCols,
+            carbonTable.getCarbonTableIdentifier,
+            carbonTable.getStorePath).collect()
+        }
+        sys.error("Alter table add column operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
       if (carbonLock != null) {
@@ -186,8 +195,9 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
-      case e: Exception => LOGGER.error("Rename table failed: " + e.getMessage)
-        throw e
+      case e: Exception =>
+        LOGGER.error(e, s"Rename table failed: ${e.getMessage}")
+        sys.error("Alter table rename table operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
       if (carbonLock != null) {
@@ -237,9 +247,10 @@ private[sql] case class AlterTableDropColumns(
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
       // check each column existence in the table
       val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
-      var dictionaryColumns = ListBuffer[CarbonColumn]()
+      var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
+      .ColumnSchema]()
       var keyColumnCountToBeDeleted = 0
-      // TODO: if deleted column list includes shared dictionary/bucketted column throw an
error
+      // TODO: if deleted column list includes bucketted column throw an error
       alterTableDropColumnModel.columns.foreach { column =>
         var columnExist = false
         tableColumns.foreach { tableColumn =>
@@ -248,7 +259,7 @@ private[sql] case class AlterTableDropColumns(
             if (tableColumn.isDimesion) {
               keyColumnCountToBeDeleted += 1
               if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
-                dictionaryColumns += tableColumn
+                dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
               }
             }
             columnExist = true
@@ -299,13 +310,16 @@ private[sql] case class AlterTableDropColumns(
           sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog])
       // TODO: 1. add check for deletion of index tables
       // delete dictionary files for dictionary column and clear dictionary cache from memory
-      ManageDictionary.deleteDictionaryFileAndCache(dictionaryColumns.toList.asJava, carbonTable)
+      new AlterTableDropColumnRDD(sparkSession.sparkContext,
+        dictionaryColumns,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).collect()
       LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table drop columns failed : " + e.getMessage)
-        throw e
+        LOGGER.error(e, s"Alter table drop columns failed : ${e.getMessage}")
+        sys.error("Alter table drop column operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
       if (carbonLock != null) {
@@ -387,8 +401,8 @@ private[sql] case class AlterTableDataTypeChange(
       LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table change datatype failed : " + e.getMessage)
-        throw e
+        LOGGER.error(e, s"Alter table change datatype failed : ${e.getMessage}")
+        sys.error("Alter table data type change operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
       if (carbonLock != null) {


Mime
View raw message