carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [20/50] [abbrv] carbondata git commit: [CARBONDATA-2076] Refactored code segregated process meta and process data in load command
Date Wed, 31 Jan 2018 05:22:40 GMT
[CARBONDATA-2076] Refactored code segregated process meta and process data in load command

This closes #1837


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3a6136df
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3a6136df
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3a6136df

Branch: refs/heads/carbonstore
Commit: 3a6136df066b9d0cdeecec283115e4a99d82a900
Parents: 7d43442
Author: kumarvishal <kumarvishal.1802@gmail.com>
Authored: Fri Jan 19 17:22:28 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jan 25 16:28:34 2018 +0530

----------------------------------------------------------------------
 .../indexstore/BlockletDataMapIndexStore.java   |   2 +-
 .../carbondata/core/util/SessionParams.java     |  14 +-
 .../hadoop/api/CarbonOutputCommitter.java       |  28 ++-
 .../spark/rdd/AggregateDataMapCompactor.scala   |  39 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala        |   6 +-
 .../spark/rdd/CarbonTableCompactor.scala        |   4 +-
 .../spark/rdd/CompactionFactory.scala           |  10 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  18 +-
 .../CarbonAlterTableCompactionCommand.scala     |  53 ++---
 .../management/CarbonInsertIntoCommand.scala    |  24 ++-
 .../management/CarbonLoadDataCommand.scala      | 111 ++++-------
 .../CreatePreAggregateTableCommand.scala        |  66 ++++---
 .../preaaggregate/PreAggregateListeners.scala   | 194 +++++++++++++++----
 .../preaaggregate/PreAggregateUtil.scala        |  76 ++++----
 .../CarbonCreateTableAsSelectCommand.scala      |  27 +--
 .../datasources/CarbonFileFormat.scala          |   4 -
 .../sql/execution/strategy/DDLStrategy.scala    |   2 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  16 +-
 .../processing/loading/events/LoadEvents.java   |  19 ++
 20 files changed, 447 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ad80fd7..111a7a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -138,7 +138,7 @@ public class BlockletDataMapIndexStore
             partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath);
             partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore);
             for (CarbonFile file : carbonFiles) {
-              blockMetaInfoMap.put(file.getAbsolutePath(),
+              blockMetaInfoMap.put(FileFactory.getUpdatedFilePath(file.getAbsolutePath()),
                   new BlockMetaInfo(file.getLocations(), file.getSize()));
             }
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 3f0e856..afbd947 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -52,10 +52,19 @@ public class SessionParams implements Serializable {
 
   private Map<String, String> sProps;
   private Map<String, String> addedProps;
-
+  private Map<String, Object> extraInfo;
   public SessionParams() {
     sProps = new HashMap<>();
     addedProps = new HashMap<>();
+    extraInfo = new HashMap<>();
+  }
+
+  public void setExtraInfo(String key, Object value) {
+    this.extraInfo.put(key, value);
+  }
+
+  public Object getExtraInfo(String key) {
+    return this.extraInfo.get(key);
   }
 
   /**
@@ -198,6 +207,9 @@ public class SessionParams implements Serializable {
     sProps.remove(property);
   }
 
+  public void removeExtraInfo(String key) {
+    extraInfo.remove(key);
+  }
   /**
    * clear the set properties
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index eb18bbd..f6e928d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -18,7 +18,10 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -30,11 +33,12 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.events.OperationContext;
 import org.apache.carbondata.events.OperationListenerBus;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.processing.loading.events.LoadEvents;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
@@ -106,18 +110,13 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     long segmentSize = CarbonLoaderUtil
         .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
     if (segmentSize > 0 || overwriteSet) {
-      String operationContextStr =
-          context.getConfiguration().get(
-              CarbonTableOutputFormat.OPERATION_CONTEXT,
-              null);
-      if (operationContextStr != null) {
-        OperationContext operationContext =
-            (OperationContext) ObjectSerializationUtil.convertStringToObject(operationContextStr);
+      Object operationContext = getOperationContext();
+      if (operationContext != null) {
         LoadEvents.LoadTablePreStatusUpdateEvent event =
             new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
                 loadModel);
         try {
-          OperationListenerBus.getInstance().fireEvent(event, operationContext);
+          OperationListenerBus.getInstance().fireEvent(event, (OperationContext) operationContext);
         } catch (Exception e) {
           throw new IOException(e);
         }
@@ -145,6 +144,15 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     }
   }
 
+  private Object getOperationContext() {
+    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+    CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
+    if (carbonSessionInfo != null) {
+      return carbonSessionInfo.getThreadParams().getExtraInfo("partition.operationcontext");
+    }
+    return null;
+  }
+
   /**
    * Merge index files to a new single file.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 636d731..5f8f389 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.{CarbonSession, SQLContext}
 import org.apache.spark.sql.execution.command.CompactionModel
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 
@@ -39,7 +39,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
     compactionModel: CompactionModel,
     executor: ExecutorService,
     sqlContext: SQLContext,
-    storeLocation: String)
+    storeLocation: String,
+    operationContext: OperationContext)
   extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
 
   override def executeCompaction(): Unit = {
@@ -57,30 +58,17 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
         CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
         carbonLoadModel.getDatabaseName + "." +
         carbonLoadModel.getTableName, "false")
-      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
-        .map(_.getColumnName).mkString(",")
-      // Creating a new query string to insert data into pre-aggregate table from that same table.
-      // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
-      // select * from preaggtable1
-      // The following code will generate the select query with a load UDF that will be used to
-      // apply DataLoadingRules
-      val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser()
-        // adding the aggregation load UDF
-        .addPreAggLoadFunction(
-        // creating the select query on the bases on table schema
-        PreAggregateUtil.createChildSelectQuery(
-          carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad")
+      CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
+      val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction")
+        .asInstanceOf[CarbonLoadDataCommand]
       try {
-        CarbonLoadDataCommand(
-          Some(carbonTable.getDatabaseName),
-          carbonTable.getTableName,
-          null,
-          Nil,
-          Map("fileheader" -> headers),
-          isOverwriteTable = false,
-          dataFrame = Some(childDataFrame),
-          internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
-            "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession)
+        val newInternalOptions = loadCommand.internalOptions ++
+                                 Map("mergedSegmentName" -> mergedLoadName)
+        loadCommand.internalOptions = newInternalOptions
+        loadCommand.dataFrame =
+                  Some(PreAggregateUtil.getDataFrame(
+                    sqlContext.sparkSession, loadCommand.logicalPlan.get))
+        loadCommand.processData(sqlContext.sparkSession)
         val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
           carbonTable.getMetaDataFilepath)
         val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
@@ -103,6 +91,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
         // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
         // allows it.
         if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
+
           executeCompaction()
         }
         CarbonSession

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f37fbd7..809c8ff 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -173,7 +173,8 @@ object CarbonDataRDDFactory {
           compactionModel,
           executor,
           sqlContext,
-          storeLocation)
+          storeLocation,
+          operationContext)
         try {
           // compaction status of the table which is triggered by the user.
           var triggeredCompactionStatus = false
@@ -225,7 +226,8 @@ object CarbonDataRDDFactory {
                   newcompactionModel,
                   executor,
                   sqlContext,
-                  storeLocation).executeCompaction()
+                  storeLocation,
+                  operationContext).executeCompaction()
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 0dc856d..a0c8f65 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -45,7 +45,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     compactionModel: CompactionModel,
     executor: ExecutorService,
     sqlContext: SQLContext,
-    storeLocation: String)
+    storeLocation: String,
+    operationContext: OperationContext)
   extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
 
   override def executeCompaction(): Unit = {
@@ -170,7 +171,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     carbonLoadModel.setLoadMetadataDetails(
       SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
     // trigger event for compaction
-    val operationContext = new OperationContext
     val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
       AlterTableCompactionPreEvent(sqlContext.sparkSession,
         carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
index 6060f06..8508d2a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.CompactionModel
 
+import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 object CompactionFactory {
@@ -33,21 +34,24 @@ object CompactionFactory {
       compactionModel: CompactionModel,
       executor: ExecutorService,
       sqlContext: SQLContext,
-      storeLocation: String): Compactor = {
+      storeLocation: String,
+      operationContext: OperationContext): Compactor = {
     if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
       new AggregateDataMapCompactor(
         carbonLoadModel,
         compactionModel,
         executor,
         sqlContext,
-        storeLocation)
+        storeLocation,
+        operationContext)
     } else {
       new CarbonTableCompactor(
         carbonLoadModel,
         compactionModel,
         executor,
         sqlContext,
-        storeLocation)
+        storeLocation,
+        operationContext)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index bbc3c2d..585fe67 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -143,6 +143,8 @@ object CarbonEnv {
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
       .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
         AlterPreAggregateTableCompactionPostListener)
+      .addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener)
+      .addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index c2c15fe..e95b8db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -53,7 +53,7 @@ class CarbonSession(@transient val sc: SparkContext,
    * and a catalog that interacts with external systems.
    */
   @transient
- override lazy val sharedState: SharedState = {
+  override lazy val sharedState: SharedState = {
     existingSharedState match {
       case Some(_) =>
         val ss = existingSharedState.get
@@ -214,17 +214,31 @@ object CarbonSession {
     ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
   }
 
+
+  def threadSet(key: String, value: Object): Unit = {
+    var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (currentThreadSessionInfo == null) {
+      currentThreadSessionInfo = new CarbonSessionInfo()
+    }
+    else {
+      currentThreadSessionInfo = currentThreadSessionInfo.clone()
+    }
+    currentThreadSessionInfo.getThreadParams.setExtraInfo(key, value)
+    ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
+  }
+
   def threadUnset(key: String): Unit = {
     val currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (currentThreadSessionInfo != null) {
       val currentThreadSessionInfoClone = currentThreadSessionInfo.clone()
       val threadParams = currentThreadSessionInfoClone.getThreadParams
       CarbonSetCommand.unsetValue(threadParams, key)
+      threadParams.removeExtraInfo(key)
       ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfoClone)
     }
   }
 
-  private[spark] def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = {
+  def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = {
     val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone()
     val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (currentThreadSessionInfoOrig != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 6af0e98..fb0f9fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, CarbonMergerMapping, CompactionModel, DataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
@@ -37,9 +37,10 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.exception.ConcurrentOperationException
@@ -53,34 +54,42 @@ import org.apache.carbondata.streaming.segment.StreamSegment
  */
 case class CarbonAlterTableCompactionCommand(
     alterTableModel: AlterTableModel,
-    tableInfoOp: Option[TableInfo] = None)
-  extends DataCommand {
+    tableInfoOp: Option[TableInfo] = None,
+    val operationContext: OperationContext = new OperationContext ) extends AtomicRunnableCommand {
 
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService =
-      LogServiceFactory.getLogService(this.getClass.getName)
-    val tableName = alterTableModel.tableName.toLowerCase
-    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
+  var table: CarbonTable = _
 
-    val table = if (tableInfoOp.isDefined) {
-      val tableInfo = tableInfoOp.get
-      //   To DO: CarbonEnv.updateStorePath
-      CarbonTable.buildFromTableInfo(tableInfo)
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableModel.tableName.toLowerCase
+    val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
+    table = if (tableInfoOp.isDefined) {
+      CarbonTable.buildFromTableInfo(tableInfoOp.get)
     } else {
-      val relation =
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(databaseName), tableName)(sparkSession)
-          .asInstanceOf[CarbonRelation]
+      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
       if (relation == null) {
-        throw new NoSuchTableException(databaseName, tableName)
+        throw new NoSuchTableException(dbName, tableName)
       }
       if (null == relation.carbonTable) {
-        LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
-        throw new NoSuchTableException(databaseName, tableName)
+        LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+        throw new NoSuchTableException(dbName, tableName)
       }
       relation.carbonTable
     }
+    if (CarbonUtil.hasAggregationDataMap(table) ||
+        (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
+      val loadMetadataEvent = new LoadMetadataEvent(table, true)
+      OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
+    }
+    Seq.empty
+  }
 
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService =
+      LogServiceFactory.getLogService(this.getClass.getName)
+    val tableName = alterTableModel.tableName.toLowerCase
+    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
     if (isLoadInProgress) {
       val message = "Cannot run data loading and compaction on same table concurrently. " +
@@ -88,7 +97,6 @@ case class CarbonAlterTableCompactionCommand(
       LOGGER.error(message)
       throw new ConcurrentOperationException(message)
     }
-
     val carbonLoadModel = new CarbonLoadModel()
     carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -103,7 +111,6 @@ case class CarbonAlterTableCompactionCommand(
       System.getProperty("java.io.tmpdir"))
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
     // trigger event for compaction
-    val operationContext = new OperationContext
     val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
       AlterTableCompactionPreEvent(sparkSession, table, null, null)
     OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
@@ -242,8 +249,6 @@ case class CarbonAlterTableCompactionCommand(
               compactionModel.currentPartitions,
               null)
 
-            // trigger event for merge index
-            val operationContext = new OperationContext
             // trigger event for compaction
             val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
               AlterTableCompactionPreStatusUpdateEvent(sqlContext.sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 810b10f..626cdba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
 
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
@@ -28,12 +28,14 @@ case class CarbonInsertIntoCommand(
     child: LogicalPlan,
     overwrite: Boolean,
     partition: Map[String, Option[String]])
-  extends DataCommand {
+  extends AtomicRunnableCommand {
 
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
+  var loadCommand: CarbonLoadDataCommand = _
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val df = Dataset.ofRows(sparkSession, child)
     val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    val load = CarbonLoadDataCommand(
+    loadCommand = CarbonLoadDataCommand(
       databaseNameOp = Some(relation.carbonRelation.databaseName),
       tableName = relation.carbonRelation.tableName,
       factPathFromUser = null,
@@ -45,10 +47,14 @@ case class CarbonInsertIntoCommand(
       updateModel = None,
       tableInfoOp = None,
       internalOptions = Map.empty,
-      partition = partition).run(sparkSession)
-    // updating relation metadata. This is in case of auto detect high cardinality
-    relation.carbonRelation.metaData =
-      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)
-    load
+      partition = partition)
+    loadCommand.processMetadata(sparkSession)
+  }
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (null != loadCommand) {
+      loadCommand.processData(sparkSession)
+    } else {
+      Seq.empty
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6b43152..7afbd92 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -39,15 +39,14 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
-import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
-import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -57,26 +56,24 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
 import org.apache.carbondata.core.metadata.PartitionMapFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.{CarbonStorePath}
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.format
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.exception.{BadRecordFoundException, NoRetryException}
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.exception.{NoRetryException}
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
@@ -87,12 +84,41 @@ case class CarbonLoadDataCommand(
     options: scala.collection.immutable.Map[String, String],
     isOverwriteTable: Boolean,
     var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None,
+    var dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None,
     var tableInfoOp: Option[TableInfo] = None,
-    internalOptions: Map[String, String] = Map.empty,
-    partition: Map[String, Option[String]] = Map.empty) extends DataCommand {
+    var internalOptions: Map[String, String] = Map.empty,
+    partition: Map[String, Option[String]] = Map.empty,
+    logicalPlan: Option[LogicalPlan] = None,
+    var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand {
 
+  var table: CarbonTable = _
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    table = if (tableInfoOp.isDefined) {
+        CarbonTable.buildFromTableInfo(tableInfoOp.get)
+      } else {
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        if (relation == null) {
+          throw new NoSuchTableException(dbName, tableName)
+        }
+        if (null == relation.carbonTable) {
+          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+          throw new NoSuchTableException(dbName, tableName)
+        }
+        relation.carbonTable
+      }
+    operationContext.setProperty("isOverwrite", isOverwriteTable)
+    if(CarbonUtil.hasAggregationDataMap(table)) {
+      val loadMetadataEvent = new LoadMetadataEvent(table, false)
+      OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
+    }
+    Seq.empty
+  }
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
@@ -121,22 +147,6 @@ case class CarbonLoadDataCommand(
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val carbonLoadModel = new CarbonLoadModel()
     try {
-      val table = if (tableInfoOp.isDefined) {
-        CarbonTable.buildFromTableInfo(tableInfoOp.get)
-      } else {
-        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        if (relation == null) {
-          throw new NoSuchTableException(dbName, tableName)
-        }
-        if (null == relation.carbonTable) {
-          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
-          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
-          throw new NoSuchTableException(dbName, tableName)
-        }
-        relation.carbonTable
-      }
-
       val tableProperties = table.getTableInfo.getFactTable.getTableProperties
       val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
       optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
@@ -167,7 +177,6 @@ case class CarbonLoadDataCommand(
       TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
       var isUpdateTableStatusRequired = false
       try {
-        val operationContext = new OperationContext
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
           new LoadTablePreExecutionEvent(
             table.getCarbonTableIdentifier,
@@ -181,7 +190,6 @@ case class CarbonLoadDataCommand(
         OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-        GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
         // Clean up the old invalid segment data before creating a new entry for new load.
         DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
         // add the start entry for the new load in the table status file
@@ -525,6 +533,7 @@ case class CarbonLoadDataCommand(
     CarbonSession.threadSet(
       CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
       isEmptyBadRecord)
+    CarbonSession.threadSet("partition.operationcontext", operationContext)
     try {
       val query: LogicalPlan = if (dataFrame.isDefined) {
         val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
@@ -649,6 +658,7 @@ case class CarbonLoadDataCommand(
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT)
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+      CarbonSession.threadUnset("partition.operationcontext")
     }
     try {
       // Trigger auto compaction
@@ -718,7 +728,6 @@ case class CarbonLoadDataCommand(
     val dataSchema =
       StructType(metastoreSchema
         .filterNot(field => partitionSchema.contains(field.name)))
-    val operationContextStr = ObjectSerializationUtil.convertObjectToString(operationContext)
     val options = new mutable.HashMap[String, String]()
     options ++= catalogTable.storage.properties
     options += (("overwrite", overWriteLocal.toString))
@@ -731,7 +740,6 @@ case class CarbonLoadDataCommand(
           partition.map{case (col, value) => (col.toLowerCase, value.isDefined)}.asJava))
       options += (("staticpartition", staticPartitionStr))
     }
-    options += (("operationcontext", operationContextStr))
     options ++= this.options
     if (updateModel.isDefined) {
       options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
@@ -869,41 +877,4 @@ case class CarbonLoadDataCommand(
     val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
     (dataFrameWithTupleId)
   }
-
-  private def updateTableMetadata(
-      carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      noDictDimension: Array[CarbonDimension]): Unit = {
-    val sparkSession = sqlContext.sparkSession
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table)
-
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    // read TableInfo
-    val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-
-    // modify TableInfo
-    val columns = tableInfo.getFact_table.getTable_columns
-    for (i <- 0 until columns.size) {
-      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
-        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
-      }
-    }
-    val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-    entry.setTime_stamp(System.currentTimeMillis())
-
-    // write TableInfo
-    metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier,
-      carbonTablePath.getCarbonTableIdentifier,
-      tableInfo, entry, carbonTablePath.getPath)(sparkSession)
-
-    // update the schema modified time
-    metastore.updateAndTouchSchemasUpdatedTime()
-
-    val identifier = model.table.getCarbonTableIdentifier
-    // update CarbonDataLoadSchema
-    val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
-      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
-    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 56f298a..c5340c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -24,13 +24,13 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
 /**
@@ -51,6 +51,7 @@ case class CreatePreAggregateTableCommand(
   extends AtomicRunnableCommand {
 
   var parentTable: CarbonTable = _
+  var loadCommand: CarbonLoadDataCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
@@ -64,8 +65,6 @@ case class CreatePreAggregateTableCommand(
     parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
     assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
       "Parent table name is different in select and create")
-
-
     var neworder = Seq[String]()
     val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
     parentOrder.foreach(parentcol =>
@@ -80,7 +79,9 @@ case class CreatePreAggregateTableCommand(
       .LOAD_SORT_SCOPE_DEFAULT))
     tableProperties
       .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
-
+    val tableIdentifier =
+      TableIdentifier(parentTableIdentifier.table + "_" + dataMapName,
+        parentTableIdentifier.database)
     // prepare table model of the collected tokens
     val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
       ifNotExistPresent = false,
@@ -137,7 +138,30 @@ case class CreatePreAggregateTableCommand(
     // to be used in further create process.
     parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
       parentTableIdentifier.table)(sparkSession)
-
+    val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
+      val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
+        .filter(p => p.getDataMapName
+          .equalsIgnoreCase(dataMapName)).head
+        .asInstanceOf[AggregationDataMapSchema]
+      PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
+        parentTable.getTableName,
+        parentTable.getDatabaseName)
+    }
+    else {
+      queryString
+    }
+    val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+      updatedLoadQuery)).drop("preAggLoad")
+    val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
+      .filter(dataMap => dataMap.getDataMapName.equalsIgnoreCase(dataMapName)).head
+      .asInstanceOf[AggregationDataMapSchema]
+    loadCommand = PreAggregateUtil.createLoadCommandForChild(
+      dataMap.getChildSchema.getListOfColumns,
+      tableIdentifier,
+      dataFrame,
+      false,
+      sparkSession = sparkSession)
+    loadCommand.processMetadata(sparkSession)
     Seq.empty
   }
 
@@ -159,35 +183,19 @@ case class CreatePreAggregateTableCommand(
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
       .nonEmpty
     if (loadAvailable) {
-      val updatedQuery = if (timeSeriesFunction.isDefined) {
-        val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
-          .filter(p => p.getDataMapName
-            .equalsIgnoreCase(dataMapName)).head
-          .asInstanceOf[AggregationDataMapSchema]
-        PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
-          parentTable.getTableName,
-          parentTable.getDatabaseName)
-      } else {
-        queryString
-      }
       // Passing segmentToLoad as * because we want to load all the segments into the
       // pre-aggregate table even if the user has set some segments on the parent table.
+      loadCommand.dataFrame = Some(PreAggregateUtil
+        .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
       PreAggregateUtil.startDataLoadForDataMap(
-          parentTable,
-          tableIdentifier,
-          updatedQuery,
-          segmentToLoad = "*",
-          validateSegments = true,
-          isOverwrite = false,
-          sparkSession = sparkSession)
+        parentTable,
+        segmentToLoad = "*",
+        validateSegments = true,
+        sparkSession,
+        loadCommand)
     }
     Seq.empty
   }
-
-  // Create the aggregation table name with parent table name prefix
-  private lazy val tableIdentifier =
-    TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, parentTableIdentifier.database)
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index fce32ab..7b273ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -20,18 +20,153 @@ package org.apache.spark.sql.execution.command.preaaggregate
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.AlterTableModel
-import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 
+/**
+ * below class will be used to create load command for compaction
+ * for all the pre agregate child data map
+ */
+object CompactionProcessMetaListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event,
+      operationContext: OperationContext): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    val tableEvent = event.asInstanceOf[LoadMetadataEvent]
+    val table = tableEvent.getCarbonTable
+    if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) {
+      val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
+        .filter(_.isInstanceOf[AggregationDataMapSchema])
+        .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
+      for (dataMapSchema: AggregationDataMapSchema <- aggregationDataMapList) {
+        val childTableName = dataMapSchema.getRelationIdentifier.getTableName
+        val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
+        // Creating a new query string to insert data into pre-aggregate table from that same table.
+        // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
+        // select * from preaggtable1
+        // The following code will generate the select query with a load UDF that will be used to
+        // apply DataLoadingRules
+        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+          // adding the aggregation load UDF
+          .addPreAggLoadFunction(
+          // creating the select query on the bases on table schema
+          PreAggregateUtil.createChildSelectQuery(
+            dataMapSchema.getChildSchema, table.getDatabaseName))).drop("preAggLoad")
+        val loadCommand = PreAggregateUtil.createLoadCommandForChild(
+          dataMapSchema.getChildSchema.getListOfColumns,
+          TableIdentifier(childTableName, Some(childDatabaseName)),
+          childDataFrame,
+          false,
+          sparkSession)
+        loadCommand.processMetadata(sparkSession)
+        operationContext
+          .setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand)
+      }
+    } else if (table.isChildDataMap) {
+      val childTableName = table.getTableName
+      val childDatabaseName = table.getDatabaseName
+      // Creating a new query string to insert data into pre-aggregate table from that same table.
+      // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
+      // select * from preaggtable1
+      // The following code will generate the select query with a load UDF that will be used to
+      // apply DataLoadingRules
+      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+        // adding the aggregation load UDF
+        .addPreAggLoadFunction(
+        // creating the select query on the bases on table schema
+        PreAggregateUtil.createChildSelectQuery(
+          table.getTableInfo.getFactTable, table.getDatabaseName))).drop("preAggLoad")
+      val loadCommand = PreAggregateUtil.createLoadCommandForChild(
+        table.getTableInfo.getFactTable.getListOfColumns,
+        TableIdentifier(childTableName, Some(childDatabaseName)),
+        childDataFrame,
+        false,
+        sparkSession)
+      loadCommand.processMetadata(sparkSession)
+      operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
+    }
+  }
+}
+
+/**
+ * Below class to is to create LoadCommand for loading the
+ * the data of pre aggregate data map
+ */
+object LoadProcessMetaListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event,
+      operationContext: OperationContext): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    val tableEvent = event.asInstanceOf[LoadMetadataEvent]
+    if (!tableEvent.isCompaction) {
+      val table = tableEvent.getCarbonTable
+      if (CarbonUtil.hasAggregationDataMap(table)) {
+        // getting all the aggergate datamap schema
+        val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
+          .filter(_.isInstanceOf[AggregationDataMapSchema])
+          .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
+        // sorting the datamap for timeseries rollup
+        val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
+        val parentTableName = table.getTableName
+        val databaseName = table.getDatabaseName
+        val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
+        for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
+          val childTableName = dataMapSchema.getRelationIdentifier.getTableName
+          val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
+          val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
+            PreAggregateUtil.getChildQuery(dataMapSchema)
+          } else {
+            // for timeseries rollup policy
+            val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
+              dataMapSchema)
+            list += dataMapSchema
+            // if non of the rollup data map is selected hit the maintable and prepare query
+            if (tableSelectedForRollup.isEmpty) {
+              PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
+                parentTableName,
+                databaseName)
+            } else {
+              // otherwise hit the select rollup datamap schema
+              PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
+                tableSelectedForRollup.get,
+                databaseName)
+            }
+          }
+          val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+            childSelectQuery)).drop("preAggLoad")
+          val isOverwrite =
+            operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
+          val loadCommand = PreAggregateUtil.createLoadCommandForChild(
+            dataMapSchema.getChildSchema.getListOfColumns,
+            TableIdentifier(childTableName, Some(childDatabaseName)),
+            childDataFrame,
+            isOverwrite,
+            sparkSession)
+          loadCommand.processMetadata(sparkSession)
+          operationContext.setProperty(dataMapSchema.getChildSchema.getTableName, loadCommand)
+        }
+      }
+    }
+  }
+}
 object LoadPostAggregateListener extends OperationEventListener {
   /**
    * Called on a specified event occurrence
@@ -42,8 +177,7 @@ object LoadPostAggregateListener extends OperationEventListener {
     val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
     val sparkSession = SparkSession.getActiveSession.get
     val carbonLoadModel = loadEvent.getCarbonLoadModel
-    val table = CarbonEnv.getCarbonTable(Option(carbonLoadModel.getDatabaseName),
-      carbonLoadModel.getTableName)(sparkSession)
+    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     if (CarbonUtil.hasAggregationDataMap(table)) {
       // getting all the aggergate datamap schema
       val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
@@ -51,41 +185,28 @@ object LoadPostAggregateListener extends OperationEventListener {
         .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
       // sorting the datamap for timeseries rollup
       val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
-      val parentTableName = table.getTableName
-      val databasename = table.getDatabaseName
-      val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
       for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
-        val childTableName = dataMapSchema.getRelationIdentifier.getTableName
-        val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
-        val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
-          PreAggregateUtil.getChildQuery(dataMapSchema)
-        } else {
-          // for timeseries rollup policy
-          val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
-              dataMapSchema)
-          list += dataMapSchema
-          // if non of the rollup data map is selected hit the maintable and prepare query
-          if (tableSelectedForRollup.isEmpty) {
-            PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
-                parentTableName,
-                databasename)
-          } else {
-            // otherwise hit the select rollup datamap schema
-            PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
-                tableSelectedForRollup.get,
-                databasename)
-          }
-        }
+        val childLoadCommand = operationContext
+          .getProperty(dataMapSchema.getChildSchema.getTableName)
+          .asInstanceOf[CarbonLoadDataCommand]
+        childLoadCommand.dataFrame = Some(PreAggregateUtil
+          .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
+        val childOperationContext = new OperationContext
+        childOperationContext
+          .setProperty(dataMapSchema.getChildSchema.getTableName,
+            operationContext.getProperty(dataMapSchema.getChildSchema.getTableName))
         val isOverwrite =
           operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
+        childOperationContext.setProperty("isOverwrite", isOverwrite)
+        childOperationContext.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction",
+          operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction"))
+        childLoadCommand.operationContext = childOperationContext
         PreAggregateUtil.startDataLoadForDataMap(
             table,
-            TableIdentifier(childTableName, Some(childDatabaseName)),
-            childSelectQuery,
             carbonLoadModel.getSegmentId,
             validateSegments = false,
-            isOverwrite,
-            sparkSession)
+            sparkSession,
+          childLoadCommand)
         }
       }
     }
@@ -115,7 +236,8 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
           compactionType.toString,
           Some(System.currentTimeMillis()),
           "")
-        CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession)
+        CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext)
+          .run(sparkSession)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index cd19e3b..dac5d5e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.preaaggregate
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
@@ -31,13 +31,14 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.sql.types.{DataType, LongType}
+import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.TableInfo
@@ -576,18 +577,15 @@ object PreAggregateUtil {
     }
     updatedPlan
   }
-
-  /**
+    /**
    * This method will start load process on the data map
    */
   def startDataLoadForDataMap(
       parentCarbonTable: CarbonTable,
-      dataMapIdentifier: TableIdentifier,
-      queryString: String,
       segmentToLoad: String,
       validateSegments: Boolean,
-      isOverwrite: Boolean,
-      sparkSession: SparkSession): Unit = {
+      sparkSession: SparkSession,
+      loadCommand: CarbonLoadDataCommand): Unit = {
     CarbonSession.threadSet(
       CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
       parentCarbonTable.getDatabaseName + "." +
@@ -597,32 +595,9 @@ object PreAggregateUtil {
       CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
       parentCarbonTable.getDatabaseName + "." +
       parentCarbonTable.getTableName, validateSegments.toString)
-    val dataMapSchemas = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala
-    val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase(
-      dataMapIdentifier.table)) match {
-      case Some(dataMapSchema) =>
-        val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala
-          .filter{column =>
-            !column.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)}
-        columns.sortBy(_.getSchemaOrdinal).map(
-          _.getColumnName).mkString(",")
-      case None =>
-        throw new RuntimeException(
-          s"${ dataMapIdentifier.table} datamap not found in DataMapSchema list: ${
-          dataMapSchemas.map(_.getChildSchema.getTableName).mkString("[", ",", "]")}")
-    }
-    val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
-      queryString)).drop("preAggLoad")
+    CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
     try {
-      CarbonLoadDataCommand(dataMapIdentifier.database,
-        dataMapIdentifier.table,
-        null,
-        Nil,
-        Map("fileheader" -> headers),
-        isOverwriteTable = isOverwrite,
-        dataFrame = Some(dataFrame),
-        internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
-        run(sparkSession)
+      loadCommand.processData(sparkSession)
     } finally {
       CarbonSession.threadUnset(
         CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
@@ -647,11 +622,12 @@ object PreAggregateUtil {
           case _ => a.getAggFunction}}(${a.getColumnName})"
       } else {
         groupingExpressions += a.getColumnName
+        aggregateColumns+= a.getColumnName
       }
     }
-    s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
-    } from $databaseName.${ tableSchema.getTableName } group by ${
-      groupingExpressions.mkString(",") }"
+    s"select ${ aggregateColumns.mkString(",") } " +
+    s"from $databaseName.${ tableSchema.getTableName }" +
+    s" group by ${ groupingExpressions.mkString(",") }"
   }
 
   /**
@@ -900,4 +876,32 @@ object PreAggregateUtil {
         aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
       CarbonCommonConstants.DEFAULT_CHARSET)
   }
+
+  /**
+   * This method will start load process on the data map
+   */
+  def createLoadCommandForChild(
+      columns: java.util.List[ColumnSchema],
+      dataMapIdentifier: TableIdentifier,
+      dataFrame: DataFrame,
+      isOverwrite: Boolean,
+      sparkSession: SparkSession): CarbonLoadDataCommand = {
+    val headers = columns.asScala.filter { column =>
+      !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+    }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
+    val loadCommand = CarbonLoadDataCommand(dataMapIdentifier.database,
+      dataMapIdentifier.table,
+      null,
+      Nil,
+      Map("fileheader" -> headers),
+      isOverwriteTable = isOverwrite,
+      dataFrame = None,
+      internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
+      logicalPlan = Some(dataFrame.queryExecution.logical))
+    loadCommand
+  }
+
+  def getDataFrame(sparkSession: SparkSession, child: LogicalPlan): DataFrame = {
+    Dataset.ofRows(sparkSession, child)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index 26a8f6f..19c265d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -42,15 +42,12 @@ case class CarbonCreateTableAsSelectCommand(
     ifNotExistsSet: Boolean = false,
     tableLocation: Option[String] = None) extends AtomicRunnableCommand {
 
-  /**
-   * variable to be used for insert into command for checking whether the
-   * table is created newly or already existed
-   */
-  var isTableCreated: Boolean = false
+  var loadCommand: CarbonInsertIntoCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = tableInfo.getFactTable.getTableName
+    var isTableCreated = false
     var databaseOpt: Option[String] = None
     if (tableInfo.getDatabaseName != null) {
       databaseOpt = Some(tableInfo.getDatabaseName)
@@ -71,10 +68,7 @@ case class CarbonCreateTableAsSelectCommand(
       CarbonCreateTableCommand(tableInfo, ifNotExistsSet, tableLocation).run(sparkSession)
       isTableCreated = true
     }
-    Seq.empty
-  }
 
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (isTableCreated) {
       val tableName = tableInfo.getFactTable.getTableName
       var databaseOpt: Option[String] = None
@@ -87,12 +81,23 @@ case class CarbonCreateTableAsSelectCommand(
         .createCarbonDataSourceHadoopRelation(sparkSession,
           TableIdentifier(tableName, Option(dbName)))
       // execute command to load data into carbon table
-      CarbonInsertIntoCommand(
+      loadCommand = CarbonInsertIntoCommand(
         carbonDataSourceHadoopRelation,
         query,
         overwrite = false,
-        partition = Map.empty).run(sparkSession)
-      LOGGER.audit(s"CTAS operation completed successfully for $dbName.$tableName")
+        partition = Map.empty)
+      loadCommand.processMetadata(sparkSession)
+    }
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (null != loadCommand) {
+      val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+      loadCommand.processData(sparkSession)
+      val carbonTable = loadCommand.relation.carbonTable
+      LOGGER.audit(s"CTAS operation completed successfully for " +
+                   s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index d74e461..99e5732 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -127,10 +127,6 @@ with Serializable {
     if (segemntsTobeDeleted.isDefined) {
       conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
     }
-    val operationContextStr = options.get("operationcontext")
-    if (operationContextStr.isDefined) {
-      conf.set(CarbonTableOutputFormat.OPERATION_CONTEXT, operationContextStr.get)
-    }
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f058e96..57be754 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -95,7 +95,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
-      case alterTable@CarbonAlterTableCompactionCommand(altertablemodel, _) =>
+      case alterTable@CarbonAlterTableCompactionCommand(altertablemodel, _, _) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(altertablemodel.tableName,
             altertablemodel.dbName))(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 87be2d2..b8608f4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 
 /**
  * Represents logical plan for one carbon table
@@ -204,7 +204,6 @@ case class CarbonRelation(
   def sizeInBytes: Long = {
     val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
       carbonTable.getAbsoluteTableIdentifier)
-
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
         .getValidAndInvalidSegments.getValidSegments.isEmpty) {
@@ -215,8 +214,19 @@ case class CarbonRelation(
           carbonTable.getCarbonTableIdentifier).getPath
         val fileType = FileFactory.getFileType(tablePath)
         if (FileFactory.isFileExist(tablePath, fileType)) {
+          // get the valid segments
+          val segments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+            .getValidAndInvalidSegments.getValidSegments.asScala
+          var size = 0L
+          // for each segment calculate the size
+          segments.foreach {validSeg =>
+            size = size + FileFactory.getDirectorySize(
+              CarbonTablePath.getSegmentPath(tablePath, validSeg))
+          }
+          // update the new table status time
           tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
-          sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+          // update the new size
+          sizeInBytesLocalValue = size
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index b00a67e..78964e7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.processing.loading.events;
 import java.util.Map;
 
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
@@ -129,6 +130,24 @@ public class LoadEvents {
   }
 
   /**
+   * Load Even class will be fired from the Load and compaction class
+   * to creating all the load commands for all preaggregate data map
+   */
+  public static class LoadMetadataEvent extends Event {
+    private CarbonTable carbonTable;
+    private boolean isCompaction;
+    public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction) {
+      this.carbonTable = carbonTable;
+      this.isCompaction = isCompaction;
+    }
+    public boolean isCompaction() {
+      return isCompaction;
+    }
+    public CarbonTable getCarbonTable() {
+      return carbonTable;
+    }
+  }
+  /**
    * Class for handling clean up in case of any failure and abort the operation.
    */
 


Mime
View raw message