carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2287] Events added for alter hive partition table
Date Wed, 28 Mar 2018 14:21:38 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 05086e536 -> 877eabdd6


[CARBONDATA-2287] Events added for alter hive partition table

Events added for alter hive partition table

This closes #2107


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/877eabdd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/877eabdd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/877eabdd

Branch: refs/heads/master
Commit: 877eabdd6080c514e23a9cbfbaef9f78acc0d39f
Parents: 05086e5
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Tue Mar 27 12:20:04 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Wed Mar 28 19:54:39 2018 +0530

----------------------------------------------------------------------
 .../filesystem/AbstractDFSCarbonFile.java       |  3 ++
 .../carbondata/core/util/CarbonProperties.java  | 50 ++++++++++----------
 .../apache/carbondata/core/util/CarbonUtil.java | 24 ++++++++++
 .../carbondata/events/AlterTableEvents.scala    | 16 +++++++
 .../org/apache/carbondata/events/Events.scala   | 10 +++-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala | 23 ++++-----
 ...arbonAlterTableAddHivePartitionCommand.scala | 14 +++++-
 ...rbonAlterTableDropHivePartitionCommand.scala | 12 +++++
 .../CarbonAlterTableSplitPartitionCommand.scala | 12 +++++
 .../impl/MeasureFieldConverterImpl.java         |  5 +-
 .../store/CarbonFactDataHandlerModel.java       | 30 ++++++++++--
 11 files changed, 156 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 8cf3efe..bf3292b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -489,6 +489,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
       fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false,
           fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path),
           fs.getDefaultBlockSize(path), null).close();
+      // haddop masks the permission accoding to configured permission, so need to set permission
+      // forcefully
+      fs.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6fa21bc..38f7513 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -874,7 +874,7 @@ public final class CarbonProperties {
               CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER));
       // checking min and max . 0  , 100 is min & max.
       if (numberOfSegmentsToBePreserved < 0 || numberOfSegmentsToBePreserved > 100)
{
-        LOGGER.error("The specified value for property "
+        LOGGER.warn("The specified value for property "
             + CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER + " is incorrect."
             + " Correct value should be in range of 0 -100. Taking the default value.");
         numberOfSegmentsToBePreserved =
@@ -930,7 +930,7 @@ public final class CarbonProperties {
         }
         compactionSize[i++] = size;
       } catch (NumberFormatException e) {
-        LOGGER.error(
+        LOGGER.warn(
             "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
                 + " is not proper. Taking the default value "
                 + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
@@ -953,7 +953,7 @@ public final class CarbonProperties {
               CarbonCommonConstants.NUM_CORES_LOADING,
               CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
     } catch (NumberFormatException exc) {
-      LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+      LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
           + " is wrong. Falling back to the default value "
           + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
       numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
@@ -974,18 +974,18 @@ public final class CarbonProperties {
     } catch (Exception e) {
       inMemoryChunkSizeInMB =
           Integer.parseInt(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT);
-      LOGGER.error("Problem in parsing the sort memory chunk size, setting with default value"
+      LOGGER.warn("Problem in parsing the sort memory chunk size, setting with default value"
           + inMemoryChunkSizeInMB);
     }
     if (inMemoryChunkSizeInMB > 1024) {
       inMemoryChunkSizeInMB = 1024;
-      LOGGER.error(
+      LOGGER.warn(
           "It is not recommended to increase the sort memory chunk size more than 1024MB,
"
               + "so setting the value to "
               + inMemoryChunkSizeInMB);
     } else if (inMemoryChunkSizeInMB < 1) {
       inMemoryChunkSizeInMB = 1;
-      LOGGER.error(
+      LOGGER.warn(
           "It is not recommended to decrease the sort memory chunk size less than 1MB, "
               + "so setting the value to "
               + inMemoryChunkSizeInMB);
@@ -1071,7 +1071,7 @@ public final class CarbonProperties {
               CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
 
       if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
-        LOGGER.error("The specified value for property "
+        LOGGER.warn("The specified value for property "
             + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
             + "is incorrect."
             + " Correct value should be in range of 0 -10000. Taking the default value.");
@@ -1079,7 +1079,7 @@ public final class CarbonProperties {
             CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
       }
     } catch (NumberFormatException e) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
           + " Correct value should be in range of 0 -10000. Taking the default value.");
       numberOfDeltaFilesThreshold = Integer
@@ -1101,7 +1101,7 @@ public final class CarbonProperties {
               CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
 
       if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
-        LOGGER.error("The specified value for property "
+        LOGGER.warn("The specified value for property "
             + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
             + "is incorrect."
             + " Correct value should be in range of 0 -10000. Taking the default value.");
@@ -1109,7 +1109,7 @@ public final class CarbonProperties {
             CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
       }
     } catch (NumberFormatException e) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
           + " Correct value should be in range of 0 -10000. Taking the default value.");
       numberOfDeltaFilesThreshold = Integer
@@ -1127,7 +1127,7 @@ public final class CarbonProperties {
         CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
     boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
     if (!validateBoolean) {
-      LOGGER.error("The carbon.use.multiple.temp.dir configuration value is invalid."
+      LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid."
           + "Configured value: \"" + usingMultiDirStr + "\"."
           + "Data Load will not use multiple temp directories.");
       usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
@@ -1144,7 +1144,7 @@ public final class CarbonProperties {
         CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT);
     boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
     if (!validateStorageLevel) {
-      LOGGER.error("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
+      LOGGER.warn("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
           + " configuration value is invalid. It will use default storage level("
           + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT
           + ") to persist rdd.");
@@ -1173,7 +1173,7 @@ public final class CarbonProperties {
     }
 
     if (isInvalidValue) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM
           + " is incorrect. Correct value should be in range of 0 - 1000."
           + " Taking the default value: "
@@ -1194,7 +1194,7 @@ public final class CarbonProperties {
             CarbonCommonConstants.defaultValueIsPersistEnabled);
     boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled);
     if (!validatePersistEnabled) {
-      LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled
+      LOGGER.warn("The " + CarbonCommonConstants.isPersistEnabled
           + " configuration value is invalid. It will use default value("
           + CarbonCommonConstants.defaultValueIsPersistEnabled
           + ").");
@@ -1212,7 +1212,7 @@ public final class CarbonProperties {
         CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT);
     boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
     if (!validateStorageLevel) {
-      LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+      LOGGER.warn("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
           + " configuration value is invalid. It will use default storage level("
           + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT
           + ") to persist dataset.");
@@ -1232,7 +1232,7 @@ public final class CarbonProperties {
         || "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
       return compressor;
     } else {
-      LOGGER.error("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
+      LOGGER.warn("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
           .concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and")
           .concat(" empty are allowed. It will not compress the sort temp files by default"));
       return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
@@ -1279,14 +1279,14 @@ public final class CarbonProperties {
       sortMemorySizeInMB = Integer.parseInt(
           carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB));
     } catch (NumberFormatException e) {
-      LOGGER.error(
+      LOGGER.warn(
           "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
               + "is Invalid." + " Taking the default value."
               + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
       sortMemorySizeInMB = sortMemorySizeInMBDefault;
     }
     if (sortMemorySizeInMB < sortMemorySizeInMBDefault) {
-      LOGGER.error(
+      LOGGER.warn(
           "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
               + "is less than default value." + ". Taking the default value."
               + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
@@ -1324,14 +1324,14 @@ public final class CarbonProperties {
       unsafeWorkingMemory = Integer.parseInt(
           carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB));
     } catch (NumberFormatException e) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid."
           + " Taking the default value."
           + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
       unsafeWorkingMemory = unsafeWorkingMemoryDefault;
     }
     if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT
           + "is less than the default value." + ". Taking the default value."
           + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
@@ -1349,14 +1349,14 @@ public final class CarbonProperties {
       unsafeSortStorageMemory = Integer.parseInt(carbonProperties
           .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
     } catch (NumberFormatException e) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid."
           + " Taking the default value."
           + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
       unsafeSortStorageMemory = unsafeSortStorageMemoryDefault;
     }
     if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) {
-      LOGGER.error("The specified value for property "
+      LOGGER.warn("The specified value for property "
           + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB
           + "is less than the default value." + " Taking the default value."
           + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT);
@@ -1397,7 +1397,7 @@ public final class CarbonProperties {
           .getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES,
               CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT));
     } catch (NumberFormatException exc) {
-      LOGGER.error(
+      LOGGER.warn(
           "The heap memory pooling threshold bytes is invalid. Using the default value "
           + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
       thresholdSize = Integer.parseInt(
@@ -1418,7 +1418,7 @@ public final class CarbonProperties {
               CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT));
       preserveSeconds = preserveHours * 3600 * 1000L;
     } catch (NumberFormatException exc) {
-      LOGGER.error(
+      LOGGER.warn(
           "The value of '" + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS
           + "' is invalid. Using the default value "
           + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT);
@@ -1438,7 +1438,7 @@ public final class CarbonProperties {
           .getProperty(CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT,
               CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT));
     } catch (NumberFormatException exc) {
-      LOGGER.error(
+      LOGGER.warn(
           "The value of '" + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT
           + "' is invalid. Using the default value "
           + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 1082d78..3c347db 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -99,6 +99,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TBase;
@@ -846,6 +848,28 @@ public final class CarbonUtil {
   }
 
   /**
+   *
+   * This method will check and create the given path with 777 permission
+   */
+  public static boolean checkAndCreateFolderWithPermission(String path) {
+    boolean created = false;
+    try {
+      FileFactory.FileType fileType = FileFactory.getFileType(path);
+      if (FileFactory.isFileExist(path, fileType)) {
+        created = true;
+      } else {
+        FileFactory.createDirectoryAndSetPermission(path,
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        created = true;
+      }
+    } catch (IOException e) {
+      LOGGER.error(e);
+    }
+    return created;
+  }
+
+
+  /**
    * This method will return the size of a given file
    */
   public static long getFileSize(String filePath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 538df4a..7c4339f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -215,3 +215,19 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
 case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
+
+/**
+ * pre event for standard hive partition
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class PreAlterTableHivePartitionCommandEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo
+
+/**
+ * post event for standard hive partition
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class PostAlterTableHivePartitionCommandEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 799d8c4..d85e8ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -100,7 +100,7 @@ trait AlterTableCompactionStatusUpdateEventInfo {
 }
 
 /**
- * event for alter_table_compaction
+ * event info for alter_table_compaction
  */
 trait AlterTableCompactionEventInfo {
   val sparkSession: SparkSession
@@ -108,6 +108,14 @@ trait AlterTableCompactionEventInfo {
 }
 
 /**
+ * event for alter table standard hive partition
+ */
+trait AlterTableHivePartitionInfo {
+  val sparkSession: SparkSession
+  val carbonTable: CarbonTable
+}
+
+/**
  * event for DeleteSegmentById
  */
 trait DeleteSegmentbyIdEventInfo {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index bf5f660..50102f1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -311,18 +311,7 @@ object StreamHandoffRDD {
         SegmentStatus.INSERT_IN_PROGRESS,
         carbonLoadModel.getFactTimeStamp,
         false)
-      val operationContext = new OperationContext()
-      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
-        new LoadTablePreStatusUpdateEvent(
-          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
-          carbonLoadModel)
-      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
-      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
-        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
-      OperationListenerBus.getInstance()
-        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
         sparkSession.sparkContext,
@@ -355,7 +344,19 @@ object StreamHandoffRDD {
     }
 
     if (loadStatus == SegmentStatus.SUCCESS) {
+      val operationContext = new OperationContext()
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+          carbonLoadModel)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
       val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
+
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       if (!done) {
         val errorMessage = "Handoff failed due to failure in table status updation."
         LOGGER.audit("Handoff is failed for " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index b0e6b94..b583c6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent,
PreAlterTableHivePartitionCommandEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
@@ -68,7 +69,18 @@ case class CarbonAlterTableAddHivePartitionCommand(
           currParts.exists(p => part.equals(p))
         }.asJava)
       }
+      val operationContext = new OperationContext
+      val preAlterTableHivePartitionCommandEvent = new PreAlterTableHivePartitionCommandEvent(
+        sparkSession,
+        table)
+      OperationListenerBus.getInstance()
+        .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
       AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession)
+      val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+        sparkSession,
+        table)
+      OperationListenerBus.getInstance()
+        .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
     }
     Seq.empty[Row]
   }
@@ -113,7 +125,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
           loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
         newMetaEntry.setSegmentFile(segmentFileName)
         val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
-        CarbonUtil.checkAndCreateFolder(segmentsLoc)
+        CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
         val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
         SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
         CarbonLoaderUtil.populateNewLoadMetaEntry(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 407057e..c67d694 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent,
PreAlterTableHivePartitionCommandEvent}
 import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
 
 /**
@@ -89,6 +90,12 @@ case class CarbonAlterTableDropHivePartitionCommand(
             partition.location)
         }
         carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava)
+        val operationContext = new OperationContext
+        val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent(
+          sparkSession,
+          table)
+        OperationListenerBus.getInstance()
+          .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
         // Drop the partitions from hive.
         AlterTableDropPartitionCommand(
           tableName,
@@ -96,6 +103,11 @@ case class CarbonAlterTableDropHivePartitionCommand(
           ifExists,
           purge,
           retainData).run(sparkSession)
+        val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+          sparkSession,
+          table)
+        OperationListenerBus.getInstance()
+          .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
       } catch {
         case e: Exception =>
           if (!ifExists) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 4b89296..1bdf414 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent,
PreAlterTableHivePartitionCommandEvent}
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.partition.SplitPartitionCallable
@@ -151,12 +152,23 @@ case class CarbonAlterTableSplitPartitionCommand(
       carbonLoadModel.setTablePath(tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
+      val operationContext = new OperationContext
+      val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent(
+        sparkSession,
+        table)
+      OperationListenerBus.getInstance()
+        .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
       alterTableSplitPartition(
         sparkSession.sqlContext,
         splitPartitionModel.partitionId.toInt.toString,
         carbonLoadModel,
         oldPartitionIds.asScala.toList
       )
+      val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent(
+        sparkSession,
+        table)
+      OperationListenerBus.getInstance()
+        .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
       success = true
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 6664a2c..724a312 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -95,8 +95,9 @@ public class MeasureFieldConverterImpl implements FieldConverter {
         }
         row.update(output, index);
       } catch (NumberFormatException e) {
-        LOGGER.warn(
-            "Cant not convert value to Numeric type value. Value considered as null.");
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Can not convert value to Numeric type value. Value considered as
null.");
+        }
         logHolder.setReason(
             CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
         output = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 8aa5bde..1d892e0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -308,7 +309,7 @@ public class CarbonFactDataHandlerModel {
       measureDataTypes[i++] = msr.getDataType();
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+    CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
     boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -337,9 +338,32 @@ public class CarbonFactDataHandlerModel {
    * @return data directory path
    */
   private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration)
{
+    // configuration.getDataWritePath will not be null only in case of partition
     if (configuration.getDataWritePath() != null) {
-      CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath());
-      return configuration.getDataWritePath();
+      String paths = configuration.getDataWritePath();
+      AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
+      String partPath = absoluteTableIdentifier.getTablePath();
+      String[] dirs = paths.split(partPath);
+      /* it will create folder one by one and apply the permissions
+       else creation of folder in one go will set the permission for last directory only
+       e.g. paths="/home/rahul/Documents/store/carbonTable1/emp_name=rahul/loc=india/dept=rd"
+            So, dirs={"","/emp_name=rahul/loc=india/dept=rd"}
+            if (dirs.length > 1) then partDirs ={"","emp_name=rahul","loc=india","dept=rd"}
+            forEach partDirs partpath(say "/home/rahul/Documents/store/carbonTable1") will
+            be keep appending with "emp_name=rahul","loc=india","dept=rd" sequentially
+      */
+      if (dirs.length > 1) {
+        String[] partDirs = dirs[1].split(CarbonCommonConstants.FILE_SEPARATOR);
+        for (String partDir : partDirs) {
+          if (!partDir.isEmpty()) {
+            partPath = partPath.concat(CarbonCommonConstants.FILE_SEPARATOR + partDir);
+            CarbonUtil.checkAndCreateFolderWithPermission(partPath);
+          }
+        }
+      } else {
+        CarbonUtil.checkAndCreateFolderWithPermission(paths);
+      }
+      return paths;
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
     String carbonDataDirectoryPath = CarbonTablePath


Mime
View raw message